# This file is dual licensed under the terms of the Apache License, Version
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
# for complete details.
from __future__ import absolute_import, division, print_function
import base64
import calendar
import collections
import contextlib
import itertools
from contextlib import contextmanager
import six
from cryptography import utils, x509
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
from cryptography.hazmat.backends.interfaces import (
CMACBackend, CipherBackend, DERSerializationBackend, DHBackend, DSABackend,
EllipticCurveBackend, HMACBackend, HashBackend, PBKDF2HMACBackend,
PEMSerializationBackend, RSABackend, ScryptBackend, X509Backend
)
from cryptography.hazmat.backends.openssl import aead
from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
from cryptography.hazmat.backends.openssl.cmac import _CMACContext
from cryptography.hazmat.backends.openssl.decode_asn1 import _Integers
from cryptography.hazmat.backends.openssl.dh import (
_DHParameters, _DHPrivateKey, _DHPublicKey, _dh_params_dup
)
from cryptography.hazmat.backends.openssl.dsa import (
_DSAParameters, _DSAPrivateKey, _DSAPublicKey
)
from cryptography.hazmat.backends.openssl.ec import (
_EllipticCurvePrivateKey, _EllipticCurvePublicKey
)
from cryptography.hazmat.backends.openssl.encode_asn1 import (
_CRL_ENTRY_EXTENSION_ENCODE_HANDLERS,
_CRL_EXTENSION_ENCODE_HANDLERS, _EXTENSION_ENCODE_HANDLERS,
_encode_asn1_int_gc, _encode_asn1_str_gc, _encode_name_gc, _txt2obj_gc,
)
from cryptography.hazmat.backends.openssl.hashes import _HashContext
from cryptography.hazmat.backends.openssl.hmac import _HMACContext
from cryptography.hazmat.backends.openssl.rsa import (
_RSAPrivateKey, _RSAPublicKey
)
from cryptography.hazmat.backends.openssl.x25519 import (
_X25519PrivateKey, _X25519PublicKey
)
from cryptography.hazmat.backends.openssl.x509 import (
_Certificate, _CertificateRevocationList,
_CertificateSigningRequest, _RevokedCertificate
)
from cryptography.hazmat.bindings.openssl import binding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa
from cryptography.hazmat.primitives.asymmetric.padding import (
MGF1, OAEP, PKCS1v15, PSS
)
from cryptography.hazmat.primitives.ciphers.algorithms import (
AES, ARC4, Blowfish, CAST5, Camellia, ChaCha20, IDEA, SEED, TripleDES
)
from cryptography.hazmat.primitives.ciphers.modes import (
CBC, CFB, CFB8, CTR, ECB, GCM, OFB, XTS
)
from cryptography.hazmat.primitives.kdf import scrypt
_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"])
@utils.register_interface(CipherBackend)
@utils.register_interface(CMACBackend)
@utils.register_interface(DERSerializationBackend)
@utils.register_interface(DHBackend)
@utils.register_interface(DSABackend)
@utils.register_interface(EllipticCurveBackend)
@utils.register_interface(HashBackend)
@utils.register_interface(HMACBackend)
@utils.register_interface(PBKDF2HMACBackend)
@utils.register_interface(RSABackend)
@utils.register_interface(PEMSerializationBackend)
@utils.register_interface(X509Backend)
@utils.register_interface_if(
binding.Binding().lib.Cryptography_HAS_SCRYPT, ScryptBackend
)
class Backend(object):
"""
OpenSSL API binding interfaces.
"""
name = "openssl"
def __init__(self):
self._binding = binding.Binding()
self._ffi = self._binding.ffi
self._lib = self._binding.lib
self._cipher_registry = {}
self._register_default_ciphers()
self.activate_osrandom_engine()
self._dh_types = [self._lib.EVP_PKEY_DH]
if self._lib.Cryptography_HAS_EVP_PKEY_DHX:
self._dh_types.append(self._lib.EVP_PKEY_DHX)
def openssl_assert(self, ok):
return binding._openssl_assert(self._lib, ok)
def activate_builtin_random(self):
# Obtain a new structural reference.
e = self._lib.ENGINE_get_default_RAND()
if e != self._ffi.NULL:
self._lib.ENGINE_unregister_RAND(e)
# Reset the RNG to use the new engine.
self._lib.RAND_cleanup()
# decrement the structural reference from get_default_RAND
res = self._lib.ENGINE_finish(e)
self.openssl_assert(res == 1)
@contextlib.contextmanager
def _get_osurandom_engine(self):
# Fetches an engine by id and returns it. This creates a structural
# reference.
e = self._lib.ENGINE_by_id(self._binding._osrandom_engine_id)
self.openssl_assert(e != self._ffi.NULL)
# Initialize the engine for use. This adds a functional reference.
res = self._lib.ENGINE_init(e)
self.openssl_assert(res == 1)
try:
yield e
finally:
# Decrement the structural ref incremented by ENGINE_by_id.
res = self._lib.ENGINE_free(e)
self.openssl_assert(res == 1)
# Decrement the functional ref incremented by ENGINE_init.
res = self._lib.ENGINE_finish(e)
self.openssl_assert(res == 1)
def activate_osrandom_engine(self):
# Unregister and free the current engine.
self.activate_builtin_random()
with self._get_osurandom_engine() as e:
# Set the engine as the default RAND provider.
res = self._lib.ENGINE_set_default_RAND(e)
self.openssl_assert(res == 1)
# Reset the RNG to use the new engine.
self._lib.RAND_cleanup()
def osrandom_engine_implementation(self):
buf = self._ffi.new("char[]", 64)
with self._get_osurandom_engine() as e:
res = self._lib.ENGINE_ctrl_cmd(e, b"get_implementation",
len(buf), buf,
self._ffi.NULL, 0)
self.openssl_assert(res > 0)
return self._ffi.string(buf).decode('ascii')
def openssl_version_text(self):
"""
Friendly string name of the loaded OpenSSL library. This is not
necessarily the same version as it was compiled against.
Example: OpenSSL 1.0.1e 11 Feb 2013
"""
return self._ffi.string(
self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION)
).decode("ascii")
def openssl_version_number(self):
return self._lib.OpenSSL_version_num()
def create_hmac_ctx(self, key, algorithm):
return _HMACContext(self, key, algorithm)
def _build_openssl_digest_name(self, algorithm):
if algorithm.name == "blake2b" or algorithm.name == "blake2s":
alg = "{0}{1}".format(
algorithm.name, algorithm.digest_size * 8
).encode("ascii")
else:
alg = algorithm.name.encode("ascii")
return alg
def hash_supported(self, algorithm):
name = self._build_openssl_digest_name(algorithm)
digest = self._lib.EVP_get_digestbyname(name)
return digest != self._ffi.NULL
def hmac_supported(self, algorithm):
return self.hash_supported(algorithm)
def create_hash_ctx(self, algorithm):
return _HashContext(self, algorithm)
def cipher_supported(self, cipher, mode):
try:
adapter = self._cipher_registry[type(cipher), type(mode)]
except KeyError:
return False
evp_cipher = adapter(self, cipher, mode)
return self._ffi.NULL != evp_cipher
def register_cipher_adapter(self, cipher_cls, mode_cls, adapter):
if (cipher_cls, mode_cls) in self._cipher_registry:
raise ValueError("Duplicate registration for: {0} {1}.".format(
cipher_cls, mode_cls)
)
self._cipher_registry[cipher_cls, mode_cls] = adapter
def _register_default_ciphers(self):
for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]:
self.register_cipher_adapter(
AES,
mode_cls,
GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}")
)
for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
self.register_cipher_adapter(
Camellia,
mode_cls,
GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}")
)
for mode_cls in [CBC, CFB, CFB8, OFB]:
self.register_cipher_adapter(
TripleDES,
mode_cls,
GetCipherByName("des-ede3-{mode.name}")
)
self.register_cipher_adapter(
TripleDES,
ECB,
GetCipherByName("des-ede3")
)
for mode_cls in [CBC, CFB, OFB, ECB]:
self.register_cipher_adapter(
Blowfish,
mode_cls,
GetCipherByName("bf-{mode.name}")
)
for mode_cls in [CBC, CFB, OFB, ECB]:
self.register_cipher_adapter(
SEED,
mode_cls,
GetCipherByName("seed-{mode.name}")
)
for cipher_cls, mode_cls in itertools.product(
[CAST5, IDEA],
[CBC, OFB, CFB, ECB],
):
self.register_cipher_adapter(
cipher_cls,
mode_cls,
GetCipherByName("{cipher.name}-{mode.name}")
)
self.register_cipher_adapter(
ARC4,
type(None),
GetCipherByName("rc4")
)
self.register_cipher_adapter(
ChaCha20,
type(None),
GetCipherByName("chacha20")
)
self.register_cipher_adapter(AES, XTS, _get_xts_cipher)
def create_symmetric_encryption_ctx(self, cipher, mode):
return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
def create_symmetric_decryption_ctx(self, cipher, mode):
return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
def pbkdf2_hmac_supported(self, algorithm):
return self.hmac_supported(algorithm)
def derive_pbkdf2_hmac(self, algorithm, length, salt, iterations,
key_material):
buf = self._ffi.new("unsigned char[]", length)
evp_md = self._lib.EVP_get_digestbyname(
algorithm.name.encode("ascii"))
self.openssl_assert(evp_md != self._ffi.NULL)
res = self._lib.PKCS5_PBKDF2_HMAC(
key_material,
len(key_material),
salt,
len(salt),
iterations,
evp_md,
length,
buf
)
self.openssl_assert(res == 1)
return self._ffi.buffer(buf)[:]
def _consume_errors(self):
return binding._consume_errors(self._lib)
def _bn_to_int(self, bn):
assert bn != self._ffi.NULL
if six.PY3:
# Python 3 has constant time from_bytes, so use that.
bn_num_bytes = self._lib.BN_num_bytes(bn)
bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
# A zero length means the BN has value 0
self.openssl_assert(bin_len >= 0)
return int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
else:
# Under Python 2 the best we can do is hex()
hex_cdata = self._lib.BN_bn2hex(bn)
self.openssl_assert(hex_cdata != self._ffi.NULL)
hex_str = self._ffi.string(hex_cdata)
self._lib.OPENSSL_free(hex_cdata)
return int(hex_str, 16)
def _int_to_bn(self, num, bn=None):
"""
Converts a python integer to a BIGNUM. The returned BIGNUM will not
be garbage collected (to support adding them to structs that take
ownership of the object). Be sure to register it for GC if it will
be discarded after use.
"""
assert bn is None or bn != self._ffi.NULL
if bn is None:
bn = self._ffi.NULL
if six.PY3:
# Python 3 has constant time to_bytes, so use that.
binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big")
bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn)
self.openssl_assert(bn_ptr != self._ffi.NULL)
return bn_ptr
else:
# Under Python 2 the best we can do is hex(), [2:] removes the 0x
# prefix.
hex_num = hex(num).rstrip("L")[2:].encode("ascii")
bn_ptr = self._ffi.new("BIGNUM **")
bn_ptr[0] = bn
res = self._lib.BN_hex2bn(bn_ptr, hex_num)
self.openssl_assert(res != 0)
self.openssl_assert(bn_ptr[0] != self._ffi.NULL)
Loading ...