Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aroundthecode / pyOpenSSL   python

Repository URL to install this package:

/ SSL.py

import os
import socket
from sys import platform
from functools import wraps, partial
from itertools import count, chain
from weakref import WeakValueDictionary
from errno import errorcode

from cryptography.utils import deprecated

from six import (
    binary_type as _binary_type, integer_types as integer_types, int2byte,
    indexbytes)

from OpenSSL._util import (
    UNSPECIFIED as _UNSPECIFIED,
    exception_from_error_queue as _exception_from_error_queue,
    ffi as _ffi,
    lib as _lib,
    make_assert as _make_assert,
    native as _native,
    path_string as _path_string,
    text_to_bytes_and_warn as _text_to_bytes_and_warn,
    no_zero_allocator as _no_zero_allocator,
)

from OpenSSL.crypto import (
    FILETYPE_PEM, _PassphraseHelper, PKey, X509Name, X509, X509Store)

__all__ = [
    'OPENSSL_VERSION_NUMBER',
    'SSLEAY_VERSION',
    'SSLEAY_CFLAGS',
    'SSLEAY_PLATFORM',
    'SSLEAY_DIR',
    'SSLEAY_BUILT_ON',
    'SENT_SHUTDOWN',
    'RECEIVED_SHUTDOWN',
    'SSLv2_METHOD',
    'SSLv3_METHOD',
    'SSLv23_METHOD',
    'TLSv1_METHOD',
    'TLSv1_1_METHOD',
    'TLSv1_2_METHOD',
    'OP_NO_SSLv2',
    'OP_NO_SSLv3',
    'OP_NO_TLSv1',
    'OP_NO_TLSv1_1',
    'OP_NO_TLSv1_2',
    'MODE_RELEASE_BUFFERS',
    'OP_SINGLE_DH_USE',
    'OP_SINGLE_ECDH_USE',
    'OP_EPHEMERAL_RSA',
    'OP_MICROSOFT_SESS_ID_BUG',
    'OP_NETSCAPE_CHALLENGE_BUG',
    'OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG',
    'OP_SSLREF2_REUSE_CERT_TYPE_BUG',
    'OP_MICROSOFT_BIG_SSLV3_BUFFER',
    'OP_MSIE_SSLV2_RSA_PADDING',
    'OP_SSLEAY_080_CLIENT_DH_BUG',
    'OP_TLS_D5_BUG',
    'OP_TLS_BLOCK_PADDING_BUG',
    'OP_DONT_INSERT_EMPTY_FRAGMENTS',
    'OP_CIPHER_SERVER_PREFERENCE',
    'OP_TLS_ROLLBACK_BUG',
    'OP_PKCS1_CHECK_1',
    'OP_PKCS1_CHECK_2',
    'OP_NETSCAPE_CA_DN_BUG',
    'OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG',
    'OP_NO_COMPRESSION',
    'OP_NO_QUERY_MTU',
    'OP_COOKIE_EXCHANGE',
    'OP_NO_TICKET',
    'OP_ALL',
    'VERIFY_PEER',
    'VERIFY_FAIL_IF_NO_PEER_CERT',
    'VERIFY_CLIENT_ONCE',
    'VERIFY_NONE',
    'SESS_CACHE_OFF',
    'SESS_CACHE_CLIENT',
    'SESS_CACHE_SERVER',
    'SESS_CACHE_BOTH',
    'SESS_CACHE_NO_AUTO_CLEAR',
    'SESS_CACHE_NO_INTERNAL_LOOKUP',
    'SESS_CACHE_NO_INTERNAL_STORE',
    'SESS_CACHE_NO_INTERNAL',
    'SSL_ST_CONNECT',
    'SSL_ST_ACCEPT',
    'SSL_ST_MASK',
    'SSL_CB_LOOP',
    'SSL_CB_EXIT',
    'SSL_CB_READ',
    'SSL_CB_WRITE',
    'SSL_CB_ALERT',
    'SSL_CB_READ_ALERT',
    'SSL_CB_WRITE_ALERT',
    'SSL_CB_ACCEPT_LOOP',
    'SSL_CB_ACCEPT_EXIT',
    'SSL_CB_CONNECT_LOOP',
    'SSL_CB_CONNECT_EXIT',
    'SSL_CB_HANDSHAKE_START',
    'SSL_CB_HANDSHAKE_DONE',
    'Error',
    'WantReadError',
    'WantWriteError',
    'WantX509LookupError',
    'ZeroReturnError',
    'SysCallError',
    'SSLeay_version',
    'Session',
    'Context',
    'Connection'
]

try:
    _buffer = buffer
except NameError:
    class _buffer(object):
        pass

OPENSSL_VERSION_NUMBER = _lib.OPENSSL_VERSION_NUMBER
SSLEAY_VERSION = _lib.SSLEAY_VERSION
SSLEAY_CFLAGS = _lib.SSLEAY_CFLAGS
SSLEAY_PLATFORM = _lib.SSLEAY_PLATFORM
SSLEAY_DIR = _lib.SSLEAY_DIR
SSLEAY_BUILT_ON = _lib.SSLEAY_BUILT_ON

SENT_SHUTDOWN = _lib.SSL_SENT_SHUTDOWN
RECEIVED_SHUTDOWN = _lib.SSL_RECEIVED_SHUTDOWN

SSLv2_METHOD = 1
SSLv3_METHOD = 2
SSLv23_METHOD = 3
TLSv1_METHOD = 4
TLSv1_1_METHOD = 5
TLSv1_2_METHOD = 6

OP_NO_SSLv2 = _lib.SSL_OP_NO_SSLv2
OP_NO_SSLv3 = _lib.SSL_OP_NO_SSLv3
OP_NO_TLSv1 = _lib.SSL_OP_NO_TLSv1
OP_NO_TLSv1_1 = _lib.SSL_OP_NO_TLSv1_1
OP_NO_TLSv1_2 = _lib.SSL_OP_NO_TLSv1_2

MODE_RELEASE_BUFFERS = _lib.SSL_MODE_RELEASE_BUFFERS

OP_SINGLE_DH_USE = _lib.SSL_OP_SINGLE_DH_USE
OP_SINGLE_ECDH_USE = _lib.SSL_OP_SINGLE_ECDH_USE
OP_EPHEMERAL_RSA = _lib.SSL_OP_EPHEMERAL_RSA
OP_MICROSOFT_SESS_ID_BUG = _lib.SSL_OP_MICROSOFT_SESS_ID_BUG
OP_NETSCAPE_CHALLENGE_BUG = _lib.SSL_OP_NETSCAPE_CHALLENGE_BUG
OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG = (
    _lib.SSL_OP_NETSCAPE_REUSE_CIPHER_CHANGE_BUG
)
OP_SSLREF2_REUSE_CERT_TYPE_BUG = _lib.SSL_OP_SSLREF2_REUSE_CERT_TYPE_BUG
OP_MICROSOFT_BIG_SSLV3_BUFFER = _lib.SSL_OP_MICROSOFT_BIG_SSLV3_BUFFER
OP_MSIE_SSLV2_RSA_PADDING = _lib.SSL_OP_MSIE_SSLV2_RSA_PADDING
OP_SSLEAY_080_CLIENT_DH_BUG = _lib.SSL_OP_SSLEAY_080_CLIENT_DH_BUG
OP_TLS_D5_BUG = _lib.SSL_OP_TLS_D5_BUG
OP_TLS_BLOCK_PADDING_BUG = _lib.SSL_OP_TLS_BLOCK_PADDING_BUG
OP_DONT_INSERT_EMPTY_FRAGMENTS = _lib.SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS
OP_CIPHER_SERVER_PREFERENCE = _lib.SSL_OP_CIPHER_SERVER_PREFERENCE
OP_TLS_ROLLBACK_BUG = _lib.SSL_OP_TLS_ROLLBACK_BUG
OP_PKCS1_CHECK_1 = _lib.SSL_OP_PKCS1_CHECK_1
OP_PKCS1_CHECK_2 = _lib.SSL_OP_PKCS1_CHECK_2
OP_NETSCAPE_CA_DN_BUG = _lib.SSL_OP_NETSCAPE_CA_DN_BUG
OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG = (
    _lib.SSL_OP_NETSCAPE_DEMO_CIPHER_CHANGE_BUG
)
OP_NO_COMPRESSION = _lib.SSL_OP_NO_COMPRESSION

OP_NO_QUERY_MTU = _lib.SSL_OP_NO_QUERY_MTU
OP_COOKIE_EXCHANGE = _lib.SSL_OP_COOKIE_EXCHANGE
OP_NO_TICKET = _lib.SSL_OP_NO_TICKET

OP_ALL = _lib.SSL_OP_ALL

VERIFY_PEER = _lib.SSL_VERIFY_PEER
VERIFY_FAIL_IF_NO_PEER_CERT = _lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT
VERIFY_CLIENT_ONCE = _lib.SSL_VERIFY_CLIENT_ONCE
VERIFY_NONE = _lib.SSL_VERIFY_NONE

SESS_CACHE_OFF = _lib.SSL_SESS_CACHE_OFF
SESS_CACHE_CLIENT = _lib.SSL_SESS_CACHE_CLIENT
SESS_CACHE_SERVER = _lib.SSL_SESS_CACHE_SERVER
SESS_CACHE_BOTH = _lib.SSL_SESS_CACHE_BOTH
SESS_CACHE_NO_AUTO_CLEAR = _lib.SSL_SESS_CACHE_NO_AUTO_CLEAR
SESS_CACHE_NO_INTERNAL_LOOKUP = _lib.SSL_SESS_CACHE_NO_INTERNAL_LOOKUP
SESS_CACHE_NO_INTERNAL_STORE = _lib.SSL_SESS_CACHE_NO_INTERNAL_STORE
SESS_CACHE_NO_INTERNAL = _lib.SSL_SESS_CACHE_NO_INTERNAL

SSL_ST_CONNECT = _lib.SSL_ST_CONNECT
SSL_ST_ACCEPT = _lib.SSL_ST_ACCEPT
SSL_ST_MASK = _lib.SSL_ST_MASK
if _lib.Cryptography_HAS_SSL_ST:
    SSL_ST_INIT = _lib.SSL_ST_INIT
    SSL_ST_BEFORE = _lib.SSL_ST_BEFORE
    SSL_ST_OK = _lib.SSL_ST_OK
    SSL_ST_RENEGOTIATE = _lib.SSL_ST_RENEGOTIATE
    __all__.extend([
        'SSL_ST_INIT',
        'SSL_ST_BEFORE',
        'SSL_ST_OK',
        'SSL_ST_RENEGOTIATE',
    ])

SSL_CB_LOOP = _lib.SSL_CB_LOOP
SSL_CB_EXIT = _lib.SSL_CB_EXIT
SSL_CB_READ = _lib.SSL_CB_READ
SSL_CB_WRITE = _lib.SSL_CB_WRITE
SSL_CB_ALERT = _lib.SSL_CB_ALERT
SSL_CB_READ_ALERT = _lib.SSL_CB_READ_ALERT
SSL_CB_WRITE_ALERT = _lib.SSL_CB_WRITE_ALERT
SSL_CB_ACCEPT_LOOP = _lib.SSL_CB_ACCEPT_LOOP
SSL_CB_ACCEPT_EXIT = _lib.SSL_CB_ACCEPT_EXIT
SSL_CB_CONNECT_LOOP = _lib.SSL_CB_CONNECT_LOOP
SSL_CB_CONNECT_EXIT = _lib.SSL_CB_CONNECT_EXIT
SSL_CB_HANDSHAKE_START = _lib.SSL_CB_HANDSHAKE_START
SSL_CB_HANDSHAKE_DONE = _lib.SSL_CB_HANDSHAKE_DONE

# Taken from https://golang.org/src/crypto/x509/root_linux.go
_CERTIFICATE_FILE_LOCATIONS = [
    "/etc/ssl/certs/ca-certificates.crt",  # Debian/Ubuntu/Gentoo etc.
    "/etc/pki/tls/certs/ca-bundle.crt",  # Fedora/RHEL 6
    "/etc/ssl/ca-bundle.pem",  # OpenSUSE
    "/etc/pki/tls/cacert.pem",  # OpenELEC
    "/etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem",  # CentOS/RHEL 7
]

_CERTIFICATE_PATH_LOCATIONS = [
    "/etc/ssl/certs",  # SLES10/SLES11
]

# These values are compared to output from cffi's ffi.string so they must be
# byte strings.
_CRYPTOGRAPHY_MANYLINUX1_CA_DIR = b"/opt/pyca/cryptography/openssl/certs"
_CRYPTOGRAPHY_MANYLINUX1_CA_FILE = b"/opt/pyca/cryptography/openssl/cert.pem"


class Error(Exception):
    """
    An error occurred in an `OpenSSL.SSL` API.
    """


_raise_current_error = partial(_exception_from_error_queue, Error)
_openssl_assert = _make_assert(Error)


class WantReadError(Error):
    pass


class WantWriteError(Error):
    pass


class WantX509LookupError(Error):
    pass


class ZeroReturnError(Error):
    pass


class SysCallError(Error):
    pass


class _CallbackExceptionHelper(object):
    """
    A base class for wrapper classes that allow for intelligent exception
    handling in OpenSSL callbacks.

    :ivar list _problems: Any exceptions that occurred while executing in a
        context where they could not be raised in the normal way.  Typically
        this is because OpenSSL has called into some Python code and requires a
        return value.  The exceptions are saved to be raised later when it is
        possible to do so.
    """

    def __init__(self):
        self._problems = []

    def raise_if_problem(self):
        """
        Raise an exception from the OpenSSL error queue or that was previously
        captured whe running a callback.
        """
        if self._problems:
            try:
                _raise_current_error()
            except Error:
                pass
            raise self._problems.pop(0)


class _VerifyHelper(_CallbackExceptionHelper):
    """
    Wrap a callback such that it can be used as a certificate verification
    callback.
    """

    def __init__(self, callback):
        _CallbackExceptionHelper.__init__(self)

        @wraps(callback)
        def wrapper(ok, store_ctx):
            x509 = _lib.X509_STORE_CTX_get_current_cert(store_ctx)
            _lib.X509_up_ref(x509)
            cert = X509._from_raw_x509_ptr(x509)
            error_number = _lib.X509_STORE_CTX_get_error(store_ctx)
            error_depth = _lib.X509_STORE_CTX_get_error_depth(store_ctx)

            index = _lib.SSL_get_ex_data_X509_STORE_CTX_idx()
            ssl = _lib.X509_STORE_CTX_get_ex_data(store_ctx, index)
            connection = Connection._reverse_mapping[ssl]

            try:
                result = callback(
                    connection, cert, error_number, error_depth, ok
                )
            except Exception as e:
                self._problems.append(e)
                return 0
            else:
                if result:
                    _lib.X509_STORE_CTX_set_error(store_ctx, _lib.X509_V_OK)
                    return 1
                else:
                    return 0

        self.callback = _ffi.callback(
            "int (*)(int, X509_STORE_CTX *)", wrapper)


class _NpnAdvertiseHelper(_CallbackExceptionHelper):
    """
    Wrap a callback such that it can be used as an NPN advertisement callback.
    """

    def __init__(self, callback):
        _CallbackExceptionHelper.__init__(self)

        @wraps(callback)
        def wrapper(ssl, out, outlen, arg):
Loading ...