Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

aroundthecode / pycryptodome   python

Repository URL to install this package:

/ SelfTest / Cipher / test_CTR.py

# ===================================================================
#
# Copyright (c) 2015, Legrandin <helderijs@gmail.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# 1. Redistributions of source code must retain the above copyright
#    notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright
#    notice, this list of conditions and the following disclaimer in
#    the documentation and/or other materials provided with the
#    distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# ===================================================================

import unittest
from binascii import hexlify, unhexlify

from Crypto.SelfTest.st_common import list_test_cases
from Crypto.Util.py3compat import tobytes, bchr
from Crypto.Cipher import AES, DES3
from Crypto.Hash import SHAKE128, SHA256
from Crypto.Util import Counter

def get_tag_random(tag, length):
    return SHAKE128.new(data=tobytes(tag)).read(length)

class CtrTests(unittest.TestCase):

    key_128 = get_tag_random("key_128", 16)
    key_192 = get_tag_random("key_192", 24)
    nonce_32 = get_tag_random("nonce_32", 4)
    nonce_64 = get_tag_random("nonce_64", 8)
    ctr_64 = Counter.new(32, prefix=nonce_32)
    ctr_128 = Counter.new(64, prefix=nonce_64)

    def test_loopback_128(self):
        cipher = AES.new(self.key_128, AES.MODE_CTR, counter=self.ctr_128)
        pt = get_tag_random("plaintext", 16 * 100)
        ct = cipher.encrypt(pt)

        cipher = AES.new(self.key_128, AES.MODE_CTR, counter=self.ctr_128)
        pt2 = cipher.decrypt(ct)
        self.assertEqual(pt, pt2)

    def test_loopback_64(self):
        cipher = DES3.new(self.key_192, DES3.MODE_CTR, counter=self.ctr_64)
        pt = get_tag_random("plaintext", 8 * 100)
        ct = cipher.encrypt(pt)

        cipher = DES3.new(self.key_192, DES3.MODE_CTR, counter=self.ctr_64)
        pt2 = cipher.decrypt(ct)
        self.assertEqual(pt, pt2)

    def test_invalid_counter_parameter(self):
        # Counter object is required for ciphers with short block size
        self.assertRaises(TypeError, DES3.new, self.key_192, AES.MODE_CTR)
        # Positional arguments are not allowed (Counter must be passed as
        # keyword)
        self.assertRaises(TypeError, AES.new, self.key_128, AES.MODE_CTR, self.ctr_128)

    def test_nonce_attribute(self):
        # Nonce attribute is the prefix passed to Counter (DES3)
        cipher = DES3.new(self.key_192, DES3.MODE_CTR, counter=self.ctr_64)
        self.assertEqual(cipher.nonce, self.nonce_32)

        # Nonce attribute is the prefix passed to Counter (AES)
        cipher = AES.new(self.key_128, AES.MODE_CTR, counter=self.ctr_128)
        self.assertEqual(cipher.nonce, self.nonce_64)

        # Nonce attribute is not defined if suffix is used in Counter
        counter = Counter.new(64, prefix=self.nonce_32, suffix=self.nonce_32)
        cipher = AES.new(self.key_128, AES.MODE_CTR, counter=counter)
        self.failIf(hasattr(cipher, "nonce"))

    def test_nonce_parameter(self):
        # Nonce parameter becomes nonce attribute
        cipher1 = AES.new(self.key_128, AES.MODE_CTR, nonce=self.nonce_64)
        self.assertEqual(cipher1.nonce, self.nonce_64)

        counter = Counter.new(64, prefix=self.nonce_64, initial_value=0)
        cipher2 = AES.new(self.key_128, AES.MODE_CTR, counter=counter)
        self.assertEqual(cipher1.nonce, cipher2.nonce)

        pt = get_tag_random("plaintext", 65536)
        self.assertEqual(cipher1.encrypt(pt), cipher2.encrypt(pt))

        # Nonce is implicitly created (for AES) when no parameters are passed
        nonce1 = AES.new(self.key_128, AES.MODE_CTR).nonce
        nonce2 = AES.new(self.key_128, AES.MODE_CTR).nonce
        self.assertNotEqual(nonce1, nonce2)
        self.assertEqual(len(nonce1), 8)

        # Nonce can be zero-length
        cipher = AES.new(self.key_128, AES.MODE_CTR, nonce=b"")
        self.assertEqual(b"", cipher.nonce)
        cipher.encrypt(b'0'*300)

        # Nonce and Counter are mutually exclusive
        self.assertRaises(TypeError, AES.new, self.key_128, AES.MODE_CTR,
                          counter=self.ctr_128, nonce=self.nonce_64)

    def test_initial_value_parameter(self):
        # Test with nonce parameter
        cipher1 = AES.new(self.key_128, AES.MODE_CTR,
                          nonce=self.nonce_64, initial_value=0xFFFF)
        counter = Counter.new(64, prefix=self.nonce_64, initial_value=0xFFFF)
        cipher2 = AES.new(self.key_128, AES.MODE_CTR, counter=counter)
        pt = get_tag_random("plaintext", 65536)
        self.assertEqual(cipher1.encrypt(pt), cipher2.encrypt(pt))

        # Test without nonce parameter
        cipher1 = AES.new(self.key_128, AES.MODE_CTR,
                          initial_value=0xFFFF)
        counter = Counter.new(64, prefix=cipher1.nonce, initial_value=0xFFFF)
        cipher2 = AES.new(self.key_128, AES.MODE_CTR, counter=counter)
        pt = get_tag_random("plaintext", 65536)
        self.assertEqual(cipher1.encrypt(pt), cipher2.encrypt(pt))

        # Initial_value and Counter are mutually exclusive
        self.assertRaises(TypeError, AES.new, self.key_128, AES.MODE_CTR,
                          counter=self.ctr_128, initial_value=0)

    def test_initial_value_bytes_parameter(self):
        # Same result as when passing an integer
        cipher1 = AES.new(self.key_128, AES.MODE_CTR,
                          nonce=self.nonce_64,
                          initial_value=b"\x00"*6+b"\xFF\xFF")
        cipher2 = AES.new(self.key_128, AES.MODE_CTR,
                          nonce=self.nonce_64, initial_value=0xFFFF)
        pt = get_tag_random("plaintext", 65536)
        self.assertEqual(cipher1.encrypt(pt), cipher2.encrypt(pt))

        # Fail if the iv is too large
        self.assertRaises(ValueError, AES.new, self.key_128, AES.MODE_CTR,
                          initial_value=b"5"*17)
        self.assertRaises(ValueError, AES.new, self.key_128, AES.MODE_CTR,
                          nonce=self.nonce_64, initial_value=b"5"*9)

        # Fail if the iv is too short
        self.assertRaises(ValueError, AES.new, self.key_128, AES.MODE_CTR,
                          initial_value=b"5"*15)
        self.assertRaises(ValueError, AES.new, self.key_128, AES.MODE_CTR,
                          nonce=self.nonce_64, initial_value=b"5"*7)

    def test_iv_with_matching_length(self):
        self.assertRaises(ValueError, AES.new, self.key_128, AES.MODE_CTR,
                          counter=Counter.new(120))
        self.assertRaises(ValueError, AES.new, self.key_128, AES.MODE_CTR,
                          counter=Counter.new(136))

    def test_block_size_128(self):
        cipher = AES.new(self.key_128, AES.MODE_CTR, counter=self.ctr_128)
        self.assertEqual(cipher.block_size, AES.block_size)

    def test_block_size_64(self):
        cipher = DES3.new(self.key_192, DES3.MODE_CTR, counter=self.ctr_64)
        self.assertEqual(cipher.block_size, DES3.block_size)

    def test_unaligned_data_128(self):
        plaintexts = [ b"7777777" ] * 100

        cipher = AES.new(self.key_128, AES.MODE_CTR, counter=self.ctr_128)
        ciphertexts = [ cipher.encrypt(x) for x in plaintexts ]
        cipher = AES.new(self.key_128, AES.MODE_CTR, counter=self.ctr_128)
        self.assertEqual(b"".join(ciphertexts), cipher.encrypt(b"".join(plaintexts)))

        cipher = AES.new(self.key_128, AES.MODE_CTR, counter=self.ctr_128)
        ciphertexts = [ cipher.encrypt(x) for x in plaintexts ]
        cipher = AES.new(self.key_128, AES.MODE_CTR, counter=self.ctr_128)
        self.assertEqual(b"".join(ciphertexts), cipher.encrypt(b"".join(plaintexts)))

    def test_unaligned_data_64(self):
        plaintexts = [ b"7777777" ] * 100
        cipher = DES3.new(self.key_192, AES.MODE_CTR, counter=self.ctr_64)
        ciphertexts = [ cipher.encrypt(x) for x in plaintexts ]
        cipher = DES3.new(self.key_192, AES.MODE_CTR, counter=self.ctr_64)
        self.assertEqual(b"".join(ciphertexts), cipher.encrypt(b"".join(plaintexts)))

        cipher = DES3.new(self.key_192, AES.MODE_CTR, counter=self.ctr_64)
        ciphertexts = [ cipher.encrypt(x) for x in plaintexts ]
        cipher = DES3.new(self.key_192, AES.MODE_CTR, counter=self.ctr_64)
        self.assertEqual(b"".join(ciphertexts), cipher.encrypt(b"".join(plaintexts)))

    def test_unknown_parameters(self):
        self.assertRaises(TypeError, AES.new, self.key_128, AES.MODE_CTR,
                          7, counter=self.ctr_128)
        self.assertRaises(TypeError, AES.new, self.key_128, AES.MODE_CTR,
                          counter=self.ctr_128, unknown=7)
        # But some are only known by the base cipher (e.g. use_aesni consumed by the AES module)
        AES.new(self.key_128, AES.MODE_CTR, counter=self.ctr_128, use_aesni=False)

    def test_null_encryption_decryption(self):
        for func in "encrypt", "decrypt":
            cipher = AES.new(self.key_128, AES.MODE_CTR, counter=self.ctr_128)
            result = getattr(cipher, func)(b"")
            self.assertEqual(result, b"")

    def test_either_encrypt_or_decrypt(self):
        cipher = AES.new(self.key_128, AES.MODE_CTR, counter=self.ctr_128)
        cipher.encrypt(b"")
        self.assertRaises(TypeError, cipher.decrypt, b"")

        cipher = AES.new(self.key_128, AES.MODE_CTR, counter=self.ctr_128)
        cipher.decrypt(b"")
        self.assertRaises(TypeError, cipher.encrypt, b"")

    def test_wrap_around(self):
        # Counter is only 8 bits, so we can only encrypt/decrypt 256 blocks (=4096 bytes)
        counter = Counter.new(8, prefix=bchr(9) * 15)
        max_bytes = 4096

        cipher = AES.new(self.key_128, AES.MODE_CTR, counter=counter)
        cipher.encrypt(b'9' * max_bytes)
        self.assertRaises(OverflowError, cipher.encrypt, b'9')

        cipher = AES.new(self.key_128, AES.MODE_CTR, counter=counter)
        self.assertRaises(OverflowError, cipher.encrypt, b'9' * (max_bytes + 1))

        cipher = AES.new(self.key_128, AES.MODE_CTR, counter=counter)
        cipher.decrypt(b'9' * max_bytes)
        self.assertRaises(OverflowError, cipher.decrypt, b'9')

        cipher = AES.new(self.key_128, AES.MODE_CTR, counter=counter)
        self.assertRaises(OverflowError, cipher.decrypt, b'9' * (max_bytes + 1))

    def test_bytearray(self):
        data = b"1" * 16
        iv = b"\x00" * 6 + b"\xFF\xFF"

        # Encrypt
        cipher1 = AES.new(self.key_128, AES.MODE_CTR,
                          nonce=self.nonce_64,
                          initial_value=iv)
        ref1 = cipher1.encrypt(data)

        cipher2 = AES.new(self.key_128, AES.MODE_CTR,
                          nonce=bytearray(self.nonce_64),
                          initial_value=bytearray(iv))
        ref2 = cipher2.encrypt(bytearray(data))

        self.assertEqual(ref1, ref2)
        self.assertEqual(cipher1.nonce, cipher2.nonce)

        # Decrypt
        cipher3 = AES.new(self.key_128, AES.MODE_CTR,
                          nonce=self.nonce_64,
                          initial_value=iv)
        ref3 = cipher3.decrypt(data)

        cipher4 = AES.new(self.key_128, AES.MODE_CTR,
                          nonce=bytearray(self.nonce_64),
                          initial_value=bytearray(iv))
        ref4 = cipher4.decrypt(bytearray(data))

        self.assertEqual(ref3, ref4)

    def test_very_long_data(self):
        cipher = AES.new(b'A' * 32, AES.MODE_CTR, nonce=b'')
        ct = cipher.encrypt(b'B' * 1000000)
        digest = SHA256.new(ct).hexdigest()
        self.assertEqual(digest, "96204fc470476561a3a8f3b6fe6d24be85c87510b638142d1d0fb90989f8a6a6")

    def test_output_param(self):

        pt = b'5' * 16
        cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
        ct = cipher.encrypt(pt)

        output = bytearray(16)
        cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
        res = cipher.encrypt(pt, output=output)
        self.assertEqual(ct, output)
        self.assertEqual(res, None)
        
        cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
        res = cipher.decrypt(ct, output=output)
        self.assertEqual(pt, output)
        self.assertEqual(res, None)

    def test_output_param_memoryview(self):
        
        pt = b'5' * 16
        cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
        ct = cipher.encrypt(pt)

        output = memoryview(bytearray(16))
        cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
        cipher.encrypt(pt, output=output)
        self.assertEqual(ct, output)
        
        cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
        cipher.decrypt(ct, output=output)
        self.assertEqual(pt, output)

    def test_output_param_neg(self):

        pt = b'5' * 16
        cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
        ct = cipher.encrypt(pt)

        cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
        self.assertRaises(TypeError, cipher.encrypt, pt, output=b'0'*16)
        
        cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
        self.assertRaises(TypeError, cipher.decrypt, ct, output=b'0'*16)

        shorter_output = bytearray(15)
        cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
        self.assertRaises(ValueError, cipher.encrypt, pt, output=shorter_output)
        cipher = AES.new(b'4'*16, AES.MODE_CTR, nonce=self.nonce_64)
        self.assertRaises(ValueError, cipher.decrypt, ct, output=shorter_output)

    import sys
    if sys.version[:3] == "2.6":
        del test_output_param_memoryview


class SP800TestVectors(unittest.TestCase):
    """Class exercising the CTR test vectors found in Section F.5
    of NIST SP 800-38A"""

    def test_aes_128(self):
        plaintext =     '6bc1bee22e409f96e93d7e117393172a' +\
                        'ae2d8a571e03ac9c9eb76fac45af8e51' +\
                        '30c81c46a35ce411e5fbc1191a0a52ef' +\
                        'f69f2445df4f9b17ad2b417be66c3710'
        ciphertext =    '874d6191b620e3261bef6864990db6ce' +\
                        '9806f66b7970fdff8617187bb9fffdff' +\
                        '5ae4df3edbd5d35e5b4f09020db03eab' +\
Loading ...