Repository URL to install this package:
|
Version:
3.12.2 ▾
|
pymongo
/
ssl_support.py
|
|---|
# Copyright 2014-present MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you
# may not use this file except in compliance with the License. You
# may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Support for SSL in PyMongo."""
import atexit
import sys
import threading
from bson.py3compat import string_type
from pymongo.errors import ConfigurationError
HAVE_SSL = True
try:
import pymongo.pyopenssl_context as _ssl
except ImportError:
try:
import pymongo.ssl_context as _ssl
except ImportError:
HAVE_SSL = False
HAVE_CERTIFI = False
try:
import certifi
HAVE_CERTIFI = True
except ImportError:
pass
HAVE_WINCERTSTORE = False
try:
from wincertstore import CertFile
HAVE_WINCERTSTORE = True
except ImportError:
pass
_WINCERTSLOCK = threading.Lock()
_WINCERTS = None
if HAVE_SSL:
# Note: The validate* functions below deal with users passing
# CPython ssl module constants to configure certificate verification
# at a high level. This is legacy behavior, but requires us to
# import the ssl module even if we're only using it for this purpose.
import ssl as _stdlibssl
from ssl import CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED
HAS_SNI = _ssl.HAS_SNI
IPADDR_SAFE = _ssl.IS_PYOPENSSL or sys.version_info[:2] >= (3, 7)
SSLError = _ssl.SSLError
def validate_cert_reqs(option, value):
"""Validate the cert reqs are valid. It must be None or one of the
three values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or
``ssl.CERT_REQUIRED``.
"""
if value is None:
return value
if isinstance(value, string_type) and hasattr(_stdlibssl, value):
value = getattr(_stdlibssl, value)
if value in (CERT_NONE, CERT_OPTIONAL, CERT_REQUIRED):
return value
raise ValueError("The value of %s must be one of: "
"`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
"`ssl.CERT_REQUIRED`" % (option,))
def validate_allow_invalid_certs(option, value):
"""Validate the option to allow invalid certificates is valid."""
# Avoid circular import.
from pymongo.common import validate_boolean_or_string
boolean_cert_reqs = validate_boolean_or_string(option, value)
if boolean_cert_reqs:
return CERT_NONE
return CERT_REQUIRED
def _load_wincerts():
"""Set _WINCERTS to an instance of wincertstore.Certfile."""
global _WINCERTS
certfile = CertFile()
certfile.addstore("CA")
certfile.addstore("ROOT")
atexit.register(certfile.close)
_WINCERTS = certfile
def get_ssl_context(*args):
"""Create and return an SSLContext object."""
(certfile,
keyfile,
passphrase,
ca_certs,
cert_reqs,
crlfile,
match_hostname,
check_ocsp_endpoint) = args
verify_mode = CERT_REQUIRED if cert_reqs is None else cert_reqs
ctx = _ssl.SSLContext(_ssl.PROTOCOL_SSLv23)
# SSLContext.check_hostname was added in CPython 2.7.9 and 3.4.
if hasattr(ctx, "check_hostname"):
if _ssl.CHECK_HOSTNAME_SAFE and verify_mode != CERT_NONE:
ctx.check_hostname = match_hostname
else:
ctx.check_hostname = False
if hasattr(ctx, "check_ocsp_endpoint"):
ctx.check_ocsp_endpoint = check_ocsp_endpoint
if hasattr(ctx, "options"):
# Explicitly disable SSLv2, SSLv3 and TLS compression. Note that
# up to date versions of MongoDB 2.4 and above already disable
# SSLv2 and SSLv3, python disables SSLv2 by default in >= 2.7.7
# and >= 3.3.4 and SSLv3 in >= 3.4.3.
ctx.options |= _ssl.OP_NO_SSLv2
ctx.options |= _ssl.OP_NO_SSLv3
ctx.options |= _ssl.OP_NO_COMPRESSION
ctx.options |= _ssl.OP_NO_RENEGOTIATION
if certfile is not None:
try:
ctx.load_cert_chain(certfile, keyfile, passphrase)
except _ssl.SSLError as exc:
raise ConfigurationError(
"Private key doesn't match certificate: %s" % (exc,))
if crlfile is not None:
if _ssl.IS_PYOPENSSL:
raise ConfigurationError(
"ssl_crlfile cannot be used with PyOpenSSL")
if not hasattr(ctx, "verify_flags"):
raise ConfigurationError(
"Support for ssl_crlfile requires "
"python 2.7.9+ (pypy 2.5.1+) or 3.4+")
# Match the server's behavior.
ctx.verify_flags = getattr(_ssl, "VERIFY_CRL_CHECK_LEAF", 0)
ctx.load_verify_locations(crlfile)
if ca_certs is not None:
ctx.load_verify_locations(ca_certs)
elif cert_reqs != CERT_NONE:
# CPython >= 2.7.9 or >= 3.4.0, pypy >= 2.5.1
if hasattr(ctx, "load_default_certs"):
ctx.load_default_certs()
# Python >= 3.2.0, useless on Windows.
elif (sys.platform != "win32" and
hasattr(ctx, "set_default_verify_paths")):
ctx.set_default_verify_paths()
elif sys.platform == "win32" and HAVE_WINCERTSTORE:
with _WINCERTSLOCK:
if _WINCERTS is None:
_load_wincerts()
ctx.load_verify_locations(_WINCERTS.name)
elif HAVE_CERTIFI:
ctx.load_verify_locations(certifi.where())
else:
raise ConfigurationError(
"`ssl_cert_reqs` is not ssl.CERT_NONE and no system "
"CA certificates could be loaded. `ssl_ca_certs` is "
"required.")
ctx.verify_mode = verify_mode
return ctx
else:
class SSLError(Exception):
pass
HAS_SNI = False
IPADDR_SAFE = False
def validate_cert_reqs(option, dummy):
"""No ssl module, raise ConfigurationError."""
raise ConfigurationError("The value of %s is set but can't be "
"validated. The ssl module is not available"
% (option,))
def validate_allow_invalid_certs(option, dummy):
"""No ssl module, raise ConfigurationError."""
return validate_cert_reqs(option, dummy)
def get_ssl_context(*dummy):
"""No ssl module, raise ConfigurationError."""
raise ConfigurationError("The ssl module is not available.")