Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aroundthecode / pyOpenSSL   python

Repository URL to install this package:

Version: 18.0.0 

/ crypto.py

import datetime

from base64 import b16encode
from functools import partial
from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__

from six import (
    integer_types as _integer_types,
    text_type as _text_type,
    PY3 as _PY3)

from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import dsa, rsa
from cryptography.utils import deprecated

from OpenSSL._util import (
    ffi as _ffi,
    lib as _lib,
    exception_from_error_queue as _exception_from_error_queue,
    byte_string as _byte_string,
    native as _native,
    UNSPECIFIED as _UNSPECIFIED,
    text_to_bytes_and_warn as _text_to_bytes_and_warn,
    make_assert as _make_assert,
)

__all__ = [
    'FILETYPE_PEM',
    'FILETYPE_ASN1',
    'FILETYPE_TEXT',
    'TYPE_RSA',
    'TYPE_DSA',
    'Error',
    'PKey',
    'get_elliptic_curves',
    'get_elliptic_curve',
    'X509Name',
    'X509Extension',
    'X509Req',
    'X509',
    'X509StoreFlags',
    'X509Store',
    'X509StoreContextError',
    'X509StoreContext',
    'load_certificate',
    'dump_certificate',
    'dump_publickey',
    'dump_privatekey',
    'Revoked',
    'CRL',
    'PKCS7',
    'PKCS12',
    'NetscapeSPKI',
    'load_publickey',
    'load_privatekey',
    'dump_certificate_request',
    'load_certificate_request',
    'sign',
    'verify',
    'dump_crl',
    'load_crl',
    'load_pkcs7_data',
    'load_pkcs12'
]

FILETYPE_PEM = _lib.SSL_FILETYPE_PEM
FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1

# TODO This was an API mistake.  OpenSSL has no such constant.
FILETYPE_TEXT = 2 ** 16 - 1

TYPE_RSA = _lib.EVP_PKEY_RSA
TYPE_DSA = _lib.EVP_PKEY_DSA


class Error(Exception):
    """
    An error occurred in an `OpenSSL.crypto` API.
    """


_raise_current_error = partial(_exception_from_error_queue, Error)
_openssl_assert = _make_assert(Error)


def _get_backend():
    """
    Importing the backend from cryptography has the side effect of activating
    the osrandom engine. This mutates the global state of OpenSSL in the
    process and causes issues for various programs that use subinterpreters or
    embed Python. By putting the import in this function we can avoid
    triggering this side effect unless _get_backend is called.
    """
    from cryptography.hazmat.backends.openssl.backend import backend
    return backend


def _untested_error(where):
    """
    An OpenSSL API failed somehow.  Additionally, the failure which was
    encountered isn't one that's exercised by the test suite so future behavior
    of pyOpenSSL is now somewhat less predictable.
    """
    raise RuntimeError("Unknown %s failure" % (where,))


def _new_mem_buf(buffer=None):
    """
    Allocate a new OpenSSL memory BIO.

    Arrange for the garbage collector to clean it up automatically.

    :param buffer: None or some bytes to use to put into the BIO so that they
        can be read out.
    """
    if buffer is None:
        bio = _lib.BIO_new(_lib.BIO_s_mem())
        free = _lib.BIO_free
    else:
        data = _ffi.new("char[]", buffer)
        bio = _lib.BIO_new_mem_buf(data, len(buffer))

        # Keep the memory alive as long as the bio is alive!
        def free(bio, ref=data):
            return _lib.BIO_free(bio)

    _openssl_assert(bio != _ffi.NULL)

    bio = _ffi.gc(bio, free)
    return bio


def _bio_to_string(bio):
    """
    Copy the contents of an OpenSSL BIO object into a Python byte string.
    """
    result_buffer = _ffi.new('char**')
    buffer_length = _lib.BIO_get_mem_data(bio, result_buffer)
    return _ffi.buffer(result_buffer[0], buffer_length)[:]


def _set_asn1_time(boundary, when):
    """
    The the time value of an ASN1 time object.

    @param boundary: An ASN1_TIME pointer (or an object safely
        castable to that type) which will have its value set.
    @param when: A string representation of the desired time value.

    @raise TypeError: If C{when} is not a L{bytes} string.
    @raise ValueError: If C{when} does not represent a time in the required
        format.
    @raise RuntimeError: If the time value cannot be set for some other
        (unspecified) reason.
    """
    if not isinstance(when, bytes):
        raise TypeError("when must be a byte string")

    set_result = _lib.ASN1_TIME_set_string(boundary, when)
    if set_result == 0:
        raise ValueError("Invalid string")


def _get_asn1_time(timestamp):
    """
    Retrieve the time value of an ASN1 time object.

    @param timestamp: An ASN1_GENERALIZEDTIME* (or an object safely castable to
        that type) from which the time value will be retrieved.

    @return: The time value from C{timestamp} as a L{bytes} string in a certain
        format.  Or C{None} if the object contains no time value.
    """
    string_timestamp = _ffi.cast('ASN1_STRING*', timestamp)
    if _lib.ASN1_STRING_length(string_timestamp) == 0:
        return None
    elif (
        _lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME
    ):
        return _ffi.string(_lib.ASN1_STRING_data(string_timestamp))
    else:
        generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**")
        _lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp)
        if generalized_timestamp[0] == _ffi.NULL:
            # This may happen:
            #   - if timestamp was not an ASN1_TIME
            #   - if allocating memory for the ASN1_GENERALIZEDTIME failed
            #   - if a copy of the time data from timestamp cannot be made for
            #     the newly allocated ASN1_GENERALIZEDTIME
            #
            # These are difficult to test.  cffi enforces the ASN1_TIME type.
            # Memory allocation failures are a pain to trigger
            # deterministically.
            _untested_error("ASN1_TIME_to_generalizedtime")
        else:
            string_timestamp = _ffi.cast(
                "ASN1_STRING*", generalized_timestamp[0])
            string_data = _lib.ASN1_STRING_data(string_timestamp)
            string_result = _ffi.string(string_data)
            _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0])
            return string_result


class _X509NameInvalidator(object):
    def __init__(self):
        self._names = []

    def add(self, name):
        self._names.append(name)

    def clear(self):
        for name in self._names:
            # Breaks the object, but also prevents UAF!
            del name._name


class PKey(object):
    """
    A class representing an DSA or RSA public key or key pair.
    """
    _only_public = False
    _initialized = True

    def __init__(self):
        pkey = _lib.EVP_PKEY_new()
        self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free)
        self._initialized = False

    def to_cryptography_key(self):
        """
        Export as a ``cryptography`` key.

        :rtype: One of ``cryptography``'s `key interfaces`_.

        .. _key interfaces: https://cryptography.io/en/latest/hazmat/\
            primitives/asymmetric/rsa/#key-interfaces

        .. versionadded:: 16.1.0
        """
        backend = _get_backend()
        if self._only_public:
            return backend._evp_pkey_to_public_key(self._pkey)
        else:
            return backend._evp_pkey_to_private_key(self._pkey)

    @classmethod
    def from_cryptography_key(cls, crypto_key):
        """
        Construct based on a ``cryptography`` *crypto_key*.

        :param crypto_key: A ``cryptography`` key.
        :type crypto_key: One of ``cryptography``'s `key interfaces`_.

        :rtype: PKey

        .. versionadded:: 16.1.0
        """
        pkey = cls()
        if not isinstance(crypto_key, (rsa.RSAPublicKey, rsa.RSAPrivateKey,
                                       dsa.DSAPublicKey, dsa.DSAPrivateKey)):
            raise TypeError("Unsupported key type")

        pkey._pkey = crypto_key._evp_pkey
        if isinstance(crypto_key, (rsa.RSAPublicKey, dsa.DSAPublicKey)):
            pkey._only_public = True
        pkey._initialized = True
        return pkey

    def generate_key(self, type, bits):
        """
        Generate a key pair of the given type, with the given number of bits.

        This generates a key "into" the this object.

        :param type: The key type.
        :type type: :py:data:`TYPE_RSA` or :py:data:`TYPE_DSA`
        :param bits: The number of bits.
        :type bits: :py:data:`int` ``>= 0``
        :raises TypeError: If :py:data:`type` or :py:data:`bits` isn't
            of the appropriate type.
        :raises ValueError: If the number of bits isn't an integer of
            the appropriate size.
        :return: ``None``
        """
        if not isinstance(type, int):
            raise TypeError("type must be an integer")

        if not isinstance(bits, int):
            raise TypeError("bits must be an integer")

        # TODO Check error return
        exponent = _lib.BN_new()
        exponent = _ffi.gc(exponent, _lib.BN_free)
        _lib.BN_set_word(exponent, _lib.RSA_F4)

        if type == TYPE_RSA:
            if bits <= 0:
                raise ValueError("Invalid number of bits")

            rsa = _lib.RSA_new()

            result = _lib.RSA_generate_key_ex(rsa, bits, exponent, _ffi.NULL)
            _openssl_assert(result == 1)

            result = _lib.EVP_PKEY_assign_RSA(self._pkey, rsa)
            _openssl_assert(result == 1)

        elif type == TYPE_DSA:
            dsa = _lib.DSA_new()
            _openssl_assert(dsa != _ffi.NULL)

            dsa = _ffi.gc(dsa, _lib.DSA_free)
            res = _lib.DSA_generate_parameters_ex(
                dsa, bits, _ffi.NULL, 0, _ffi.NULL, _ffi.NULL, _ffi.NULL
            )
            _openssl_assert(res == 1)

            _openssl_assert(_lib.DSA_generate_key(dsa) == 1)
            _openssl_assert(_lib.EVP_PKEY_set1_DSA(self._pkey, dsa) == 1)
        else:
            raise Error("No such key type")

        self._initialized = True

    def check(self):
        """
        Check the consistency of an RSA private key.

        This is the Python equivalent of OpenSSL's ``RSA_check_key``.

        :return: ``True`` if key is consistent.

        :raise OpenSSL.crypto.Error: if the key is inconsistent.

        :raise TypeError: if the key is of a type which cannot be checked.
            Only RSA keys can currently be checked.
        """
        if self._only_public:
            raise TypeError("public key only")

        if _lib.EVP_PKEY_type(self.type()) != _lib.EVP_PKEY_RSA:
            raise TypeError("key type unsupported")

        rsa = _lib.EVP_PKEY_get1_RSA(self._pkey)
        rsa = _ffi.gc(rsa, _lib.RSA_free)
Loading ...