import time
import urllib3
from urllib3.exceptions import ReadTimeoutError, SSLError as UrllibSSLError
import warnings
CA_CERTS = None
try:
import certifi
CA_CERTS = certifi.where()
except ImportError:
pass
from .base import Connection
from ..exceptions import ConnectionError, ImproperlyConfigured, ConnectionTimeout, SSLError
from ..compat import urlencode
class Urllib3HttpConnection(Connection):
"""
Default connection class using the `urllib3` library and the http protocol.
:arg host: hostname of the node (default: localhost)
:arg port: port to use (integer, default: 9200)
:arg url_prefix: optional url prefix for elasticsearch
:arg timeout: default timeout in seconds (float, default: 10)
:arg http_auth: optional http auth information as either ':' separated
string or a tuple
:arg use_ssl: use ssl for the connection if `True`
:arg verify_certs: whether to verify SSL certificates
:arg ca_certs: optional path to CA bundle. See
https://urllib3.readthedocs.io/en/latest/security.html#using-certifi-with-urllib3
for instructions how to get default set
:arg client_cert: path to the file containing the private key and the
certificate, or cert only if using client_key
:arg client_key: path to the file containing the private key if using
separate cert and key files (client_cert will contain only the cert)
:arg ssl_version: version of the SSL protocol to use. Choices are:
SSLv23 (default) SSLv2 SSLv3 TLSv1 (see ``PROTOCOL_*`` constants in the
``ssl`` module for exact options for your environment).
:arg ssl_assert_hostname: use hostname verification if not `False`
:arg ssl_assert_fingerprint: verify the supplied certificate fingerprint if not `None`
:arg maxsize: the number of connections which will be kept open to this
host. See https://urllib3.readthedocs.io/en/1.4/pools.html#api for more
information.
:arg headers: any custom http headers to be add to requests
"""
def __init__(self, host='localhost', port=9200, http_auth=None,
use_ssl=False, verify_certs=True, ca_certs=None, client_cert=None,
client_key=None, ssl_version=None, ssl_assert_hostname=None,
ssl_assert_fingerprint=None, maxsize=10, headers=None, **kwargs):
super(Urllib3HttpConnection, self).__init__(host=host, port=port, use_ssl=use_ssl, **kwargs)
self.headers = urllib3.make_headers(keep_alive=True)
if http_auth is not None:
if isinstance(http_auth, (tuple, list)):
http_auth = ':'.join(http_auth)
self.headers.update(urllib3.make_headers(basic_auth=http_auth))
# update headers in lowercase to allow overriding of auth headers
if headers:
for k in headers:
self.headers[k.lower()] = headers[k]
self.headers.setdefault('content-type', 'application/json')
ca_certs = CA_CERTS if ca_certs is None else ca_certs
pool_class = urllib3.HTTPConnectionPool
kw = {}
if use_ssl:
pool_class = urllib3.HTTPSConnectionPool
kw.update({
'ssl_version': ssl_version,
'assert_hostname': ssl_assert_hostname,
'assert_fingerprint': ssl_assert_fingerprint,
'ca_certs': ca_certs,
'cert_file': client_cert,
'key_file': client_key,
})
if verify_certs:
if not ca_certs:
raise ImproperlyConfigured("Root certificates are missing for certificate "
"validation. Either pass them in using the ca_certs parameter or "
"install certifi to use it automatically.")
kw.update({
'cert_reqs': 'CERT_REQUIRED',
})
else:
warnings.warn(
'Connecting to %s using SSL with verify_certs=False is insecure.' % host)
kw.update({
'cert_reqs': 'CERT_NONE',
})
self.pool = pool_class(host, port=port, timeout=self.timeout, maxsize=maxsize, **kw)
def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=()):
url = self.url_prefix + url
if params:
url = '%s?%s' % (url, urlencode(params))
full_url = self.host + url
start = time.time()
try:
kw = {}
if timeout:
kw['timeout'] = timeout
# in python2 we need to make sure the url and method are not
# unicode. Otherwise the body will be decoded into unicode too and
# that will fail (#133, #201).
if not isinstance(url, str):
url = url.encode('utf-8')
if not isinstance(method, str):
method = method.encode('utf-8')
response = self.pool.urlopen(method, url, body, retries=False, headers=self.headers, **kw)
duration = time.time() - start
raw_data = response.data.decode('utf-8')
except Exception as e:
self.log_request_fail(method, full_url, url, body, time.time() - start, exception=e)
if isinstance(e, UrllibSSLError):
raise SSLError('N/A', str(e), e)
if isinstance(e, ReadTimeoutError):
raise ConnectionTimeout('TIMEOUT', str(e), e)
raise ConnectionError('N/A', str(e), e)
# raise errors based on http status codes, let the client handle those if needed
if not (200 <= response.status < 300) and response.status not in ignore:
self.log_request_fail(method, full_url, url, body, duration, response.status, raw_data)
self._raise_error(response.status, raw_data)
self.log_request_success(method, full_url, url, body, response.status,
raw_data, duration)
return response.status, response.getheaders(), raw_data
def close(self):
"""
Explicitly closes connection
"""
self.pool.close()