Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
python-keyczar / tests / keyczar_tests / crypter_test.py
Size: Mime:
#!/usr/bin/python
#
# Copyright 2008 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Testcases to test behavior of Keyczar Crypters.

@author: arkajit.dey@gmail.com (Arkajit Dey)
"""

import cStringIO as StringIO
import os
import random
import unittest

from keyczar import errors
from keyczar import keyczar
from keyczar import readers
from keyczar import writers
from keyczar import util
from keyczar import keys
from keyczar import keyinfo

TEST_DATA = os.path.realpath(os.path.join(os.getcwd(), "..", "..", "testdata"))

class BaseCrypterTest(unittest.TestCase):

  ALL_BUFFER_SIZES = (util.DEFAULT_STREAM_BUFF_SIZE,
                      999,
                      1,
                      -1,
                     )

  def setUp(self):
    self.input_data = "This is some test data"
    self.random_input_data = os.urandom(random.randrange(
      util.DEFAULT_STREAM_BUFF_SIZE * 2 + 1,
      10000))
    self.random_input_data_len = len(self.random_input_data)
    self.all_data = [self.random_input_data,
                     self.__simulateReflow(self.random_input_data)
                    ]

  def __writeToStream(self, stream, data, len_to_write=-1):
    """Helper to write to a stream with varying size writes"""
    if len_to_write == -1:
      stream.write(data)
    else:
      for c in map(None, *(iter(data),) * len_to_write):
        stream.write(''.join([x for x in c if x]))
    stream.flush()

  def __readFromStream(self, stream, len_to_read=-1):
    """Helper to read from a stream in varying size chunks"""
    result = ''
    if len_to_read == -1:
      read_data = True
      while read_data or read_data is None:
        read_data = stream.read()
        if read_data:
          result += read_data
    else:
      read_data = True
      while read_data or read_data is None:
        read_data = stream.read(len_to_read)
        if read_data:
          result += read_data
    stream.close()
    return result

  def __simulateReflow(self, data):
    """Helper to simulate reflowing of data"""
    endings = ['\n', '\r', '\r\n']
    reflowed_data = ''
    for c in map(None, *(iter(data),) * 5):
      d = ''.join([x for x in c if x])
      reflowed_data += '%s%s' %(random.choice(endings), d)
    return reflowed_data

  def __testDecrypt(self, subdir, reader=None):
    path = os.path.join(TEST_DATA, subdir)
    if reader:
      crypter = keyczar.Crypter(reader)
    else:
      crypter = keyczar.Crypter.Read(path)
    active_ciphertext = util.ReadFile(os.path.join(path, "1.out"))
    primary_ciphertext = util.ReadFile(os.path.join(path, "2.out"))
    active_decrypted = crypter.Decrypt(active_ciphertext)
    self.assertEquals(self.input_data, active_decrypted)
    primary_decrypted = crypter.Decrypt(primary_ciphertext)
    self.assertEquals(self.input_data, primary_decrypted)

  def __testDecryptReflowed(self, subdir, reader=None):
    path = os.path.join(TEST_DATA, subdir)
    if reader:
      crypter = keyczar.Crypter(reader)
    else:
      crypter = keyczar.Crypter.Read(path)
    active_ciphertext = util.ReadFile(os.path.join(path, "1.out"))

    reflowed_active_ciphertext = self.__simulateReflow(active_ciphertext)
    active_decrypted = crypter.Decrypt(reflowed_active_ciphertext)
    self.assertEquals(self.input_data, active_decrypted)

    primary_ciphertext = util.ReadFile(os.path.join(path, "2.out"))
    reflowed_primary_ciphertext = self.__simulateReflow(primary_ciphertext)
    primary_decrypted = crypter.Decrypt(reflowed_primary_ciphertext)
    self.assertEquals(self.input_data, primary_decrypted)

  def __testDecryptStream(self, subdir, reader, input_data, stream_buffer_size,
                          len_to_read, stream_source):
    """NOTE: input_data ignored here as we don't have a valid ".out" for
    random data"""
    path = os.path.join(TEST_DATA, subdir)
    if reader:
      crypter = keyczar.Crypter(reader)
    else:
      crypter = keyczar.Crypter.Read(path)
    active_ciphertext = util.ReadFile(os.path.join(path, "1.out"))
    if stream_source is None:
      decoder = None
      active_ciphertext = util.Base64WSDecode(active_ciphertext)
    else:
      decoder = util.IncrementalBase64WSStreamReader
    decryption_stream = crypter.CreateDecryptingStreamReader(
      StringIO.StringIO(active_ciphertext),
      decoder=decoder,
      buffer_size=stream_buffer_size)
    plaintext = self.__readFromStream(decryption_stream, len_to_read)
    self.assertEquals(self.input_data, plaintext,
                      'Active not equals for buffer:%d, read len:%d, src:%s' %(
                        stream_buffer_size,
                        len_to_read,
                        stream_source
                      ))

    primary_ciphertext = util.ReadFile(os.path.join(path, "2.out"))
    if stream_source is None:
      primary_ciphertext = util.Base64WSDecode(primary_ciphertext)
    decryption_stream = crypter.CreateDecryptingStreamReader(
      StringIO.StringIO(primary_ciphertext),
      decoder=decoder,
      buffer_size=stream_buffer_size)
    plaintext = self.__readFromStream(decryption_stream, len_to_read)
    self.assertEquals(self.input_data, plaintext,
                      'Primary not equals for buffer:%d, read len:%d, src:%s' %(
                        stream_buffer_size,
                        len_to_read,
                        stream_source
                      ))

  def __testEncryptAndDecrypt(self, subdir):
    crypter = keyczar.Crypter.Read(os.path.join(TEST_DATA, subdir))
    ciphertext = crypter.Encrypt(self.input_data)
    plaintext = crypter.Decrypt(ciphertext)
    self.assertEquals(self.input_data, plaintext)

    reflowed_ciphertext = self.__simulateReflow(ciphertext)
    plaintext = crypter.Decrypt(reflowed_ciphertext)
    self.assertEquals(self.input_data, plaintext)

    self.__testEncryptAndDecryptUnencoded(subdir)

  def __testEncryptAndDecryptUnencoded(self, subdir):
    crypter = keyczar.Crypter.Read(os.path.join(TEST_DATA, subdir))
    ciphertext = crypter.Encrypt(self.input_data, encoder=None)
    try:
        plaintext = crypter.Decrypt(ciphertext)
    except:
        pass
    plaintext = crypter.Decrypt(ciphertext, decoder=None)
    self.assertEquals(self.input_data, plaintext)

  def __testStandardEncryptAndStreamDecrypt(self, subdir,
                                            input_data,
                                            stream_buffer_size,
                                            len_to_read,
                                            stream_source
                                           ):
    crypter = keyczar.Crypter.Read(os.path.join(TEST_DATA, subdir))
    ciphertext = crypter.Encrypt(input_data)
    ciphertext_stream = StringIO.StringIO(ciphertext)

    if stream_source is None:
      decoder = None
      ciphertext_stream = StringIO.StringIO(util.Base64WSDecode(ciphertext))
    else:
      decoder = util.IncrementalBase64WSStreamReader
    decryption_stream = crypter.CreateDecryptingStreamReader(
      ciphertext_stream,
      decoder=decoder,
      buffer_size=stream_buffer_size)
    plaintext = self.__readFromStream(decryption_stream, len_to_read)
    self.assertEquals(len(input_data), len(plaintext),
                      'Wrong length for buffer:%d, read len:%d'
                      %(stream_buffer_size,
                        len_to_read))
    self.assertEquals(input_data, plaintext,
                      'Not equals for buffer:%d, read len:%d'
                      %(stream_buffer_size,
                        len_to_read))

  def __testStreamEncryptAndStandardDecrypt(self, subdir,
                                            input_data,
                                            stream_buffer_size,
                                            len_to_write,
                                            stream_source
                                           ):
    crypter = keyczar.Crypter.Read(os.path.join(TEST_DATA, subdir))
    ciphertext_stream = StringIO.StringIO()
    if stream_source is None:
      encoder = None
      decoder = None
    else:
      encoder = util.IncrementalBase64WSStreamWriter
      decoder = util.Base64WSDecode
    encryption_stream = crypter.CreateEncryptingStreamWriter(
      ciphertext_stream,
      encoder=encoder)
    self.__writeToStream(encryption_stream, input_data, len_to_write)
    encryption_stream.close()
    ciphertext = ciphertext_stream.getvalue()
    plaintext = crypter.Decrypt(ciphertext, decoder=decoder)
    self.assertEquals(len(input_data), len(plaintext),
                      'Wrong length for buffer:%d, write len:%d'
                      %(stream_buffer_size,
                        len_to_write))
    self.assertEquals(input_data, plaintext,
                      'Not equals for buffer:%d, write len:%d'
                      %(stream_buffer_size,
                        len_to_write))

  def __testStreamEncryptAndStreamDecrypt(self, subdir,
                                          input_data,
                                          stream_buffer_size,
                                          len_to_rw,
                                          stream_source
                                         ):
    crypter = keyczar.Crypter.Read(os.path.join(TEST_DATA, subdir))
    ciphertext_stream = StringIO.StringIO()
    if stream_source is None:
      encoder = None
      decoder = None
    else:
      encoder = util.IncrementalBase64WSStreamWriter
      decoder = util.IncrementalBase64WSStreamReader

    encryption_stream = crypter.CreateEncryptingStreamWriter(
      ciphertext_stream,
      encoder=encoder)
    self.__writeToStream(encryption_stream, input_data, len_to_rw)
    encryption_stream.close()
    ciphertext_stream.reset()

    decryption_stream = crypter.CreateDecryptingStreamReader(
      ciphertext_stream,
      decoder=decoder,
      buffer_size=stream_buffer_size)
    plaintext = self.__readFromStream(decryption_stream, len_to_rw)
    self.assertEquals(len(input_data), len(plaintext),
                      'Wrong length for buffer:%d, r/w len:%d'
                      %(stream_buffer_size,
                        len_to_rw))
    self.assertEquals(input_data, plaintext,
                      'Not equals for buffer:%d, r/w len:%d'
                      %(stream_buffer_size,
                        len_to_rw))

  def __testAllModesAndBufferSizes(self, fn, params, no_data_reqd=False):
    """
    Helper to test the passed test function with a range of read/write buffer
    sizes to test interop between them.
    NOTE: does not use base64 encoding for maximum unit test performance. The
    base64 encoding/decoding is tested in __testStreamEncryptAndStreamDecrypt
    """
    if no_data_reqd:
      avail_data = self.all_data[:1]
    else:
      avail_data = self.all_data

    for data in avail_data:
      for stream_buffer_size in self.ALL_BUFFER_SIZES:
        for len_to_rw in self.ALL_BUFFER_SIZES:
          all_params = list(params) + [data, stream_buffer_size,
                                       len_to_rw, None]
          fn(*all_params)

  def testRsaDecrypt(self):
    self.__testDecrypt("rsa")
    self.__testDecryptReflowed("rsa")

  def testRsaEncryptAndDecrypt(self):
    self.__testEncryptAndDecrypt("rsa")

  def testAesDecrypt(self):
    self.__testDecrypt("aes")
    self.__testDecryptReflowed("aes")
    self.__testAllModesAndBufferSizes(self.__testDecryptStream,
                                      ("aes", None,),
                                      no_data_reqd=True)

  def testAesEncryptedKeyDecrypt(self):
    file_reader = readers.FileReader(os.path.join(TEST_DATA, "aes-crypted"))
    key_decrypter = keyczar.Crypter.Read(os.path.join(TEST_DATA, "aes"))
    reader = readers.EncryptedReader(file_reader, key_decrypter)
    self.__testDecrypt("aes-crypted", reader)
    self.__testDecryptReflowed("aes-crypted", reader)
    self.__testAllModesAndBufferSizes(self.__testDecryptStream,
                                      ("aes-crypted", reader,),
                                      no_data_reqd=True)

  def testAesEncryptAndDecrypt(self):
    self.__testEncryptAndDecrypt("aes")

  def testAesStandardEncryptAndStreamDecryptInterop(self):
    self.__testAllModesAndBufferSizes(
        self.__testStandardEncryptAndStreamDecrypt,
        ("aes",))

  def testAesStreamEncryptAndStandardDecryptInterop(self):
    self.__testAllModesAndBufferSizes(
        self.__testStreamEncryptAndStandardDecrypt,
        ("aes",))

  def testAesStreamEncryptAndStreamDecryptInterop(self):
    """
    Test streaming in both directions.

    NOTE: we only test the 1 set of data as the other tests exercise the stream
    readers/writers individually
    """
    self.__testStreamEncryptAndStreamDecrypt('aes', self.all_data[0],
                                             -1, -1, 'default')

  def testBadAesCiphertexts(self):
    crypter = keyczar.Crypter.Read(os.path.join(TEST_DATA, "aes"))
    ciphertext = util.Base64WSDecode(crypter.Encrypt(self.input_data))
    bad = util.Base64WSEncode(chr(0))
    char = chr(ord(ciphertext[2]) ^ 44)
    ciphertext = util.Base64WSEncode(ciphertext[:2]+char+ciphertext[3:])
    self.assertRaises(errors.ShortCiphertextError, crypter.Decrypt, bad)
    self.assertRaises(errors.KeyNotFoundError, crypter.Decrypt, ciphertext)

  def testBadAesCiphertextsStream(self):
    crypter = keyczar.Crypter.Read(os.path.join(TEST_DATA, "aes"))
    ciphertext = util.Base64WSDecode(crypter.Encrypt(self.input_data))
    bad = util.Base64WSEncode(chr(0))
    char = chr(ord(ciphertext[2]) ^ 44)
    ciphertext = util.Base64WSEncode(ciphertext[:2]+char+ciphertext[3:])

    try:
      stream = StringIO.StringIO(bad)
      decryption_stream = crypter.CreateDecryptingStreamReader(stream)
      self.__readFromStream(decryption_stream)
    except errors.ShortCiphertextError:
      pass

    try:
      stream = StringIO.StringIO(ciphertext)
      decryption_stream = crypter.CreateDecryptingStreamReader(stream)
      self.__readFromStream(decryption_stream)
    except errors.KeyNotFoundError:
      pass

  def testStreamDecryptHandlesIOModuleBlockingNoneReturned(self):
    """
    Test for input streams that conform to the blocking I/O module spec, i.e.
    read() returns None to indicate no data available, but not EOF
    """
    crypter = keyczar.Crypter.Read(os.path.join(TEST_DATA, 'aes'))
    ciphertext = crypter.Encrypt(self.input_data)

    class PseudoBlockingStream(object):
      """
      A 'stream' that blocks every 2nd call to read() to simultate blocking i/o
      """

      def __init__(self, string):
        self.current_posn = 0
        self.string = string
        self.return_none = False

      def read(self, size=-1):
        result = None
        start = self.current_posn
        if not self.return_none:
          if size < 0:
            end = size
            self.current_posn = len(self.string)
          else:
            end = (start + size)
            self.current_posn = end
          result = self.string[start:end]
        else:
          if start > len(self.string):
            result = ''

        self.return_none = not self.return_none
        return result

    decryption_stream = crypter.CreateDecryptingStreamReader(
      PseudoBlockingStream(ciphertext))
    result = self.__readFromStream(decryption_stream, len_to_read=-1)
    self.assertEquals(self.input_data, result)

  def testStreamDecryptHandlesIOModuleBlockingExceptionRaised(self):
    """
    Test for input streams that conform to the blocking I/O module spec wrt
    buffered blocking, i.e. if the underlying raw stream is in non blocking-mode,
    a BlockingIOError is raised indicate no data available, but not EOF
    """
    crypter = keyczar.Crypter.Read(os.path.join(TEST_DATA, 'aes'))
    ciphertext = crypter.Encrypt(self.input_data)

    class PseudoBlockingStream(object):
      """
      A 'stream' that raises BlockingIOError every 2nd call to read() to
      simultate buffered blocking
      """

      def __init__(self, string):
        self.current_posn = 0
        self.string = string
        self.raise_exception = True

      def read(self, size=-1):
        result = None
        start = self.current_posn
        if not self.raise_exception:
          if size < 0:
            end = size
            self.current_posn = len(self.string)
          else:
            end = (start + size)
            self.current_posn = end
          result = self.string[start:end]
        else:
          if start > len(self.string):
            result = ''
          else:
            self.raise_exception = False
            raise util.BlockingIOError(1, 'Dummy error', self.current_posn)

        self.raise_exception = not self.raise_exception
        return result

    decryption_stream = crypter.CreateDecryptingStreamReader(
      PseudoBlockingStream(ciphertext))
    result = self.__readFromStream(decryption_stream, len_to_read=-1)
    self.assertEquals(self.input_data, result)

  def tearDown(self):
    self.input_data = None
    self.random_input_data = None
    self.random_input_data_len = 0

class PyCryptoCrypterTest(BaseCrypterTest):

  def setUp(self):
    keys.ACTIVE_CRYPT_LIB = 'pycrypto'
    super(PyCryptoCrypterTest, self).setUp()

class M2CryptoCrypterTest(BaseCrypterTest):

  def setUp(self):
    keys.ACTIVE_CRYPT_LIB = 'm2crypto'
    super(M2CryptoCrypterTest, self).setUp()

class PyCryptoM2CryptoInteropTest(unittest.TestCase):

  def setUp(self):
    self.input = "The quick brown fox was not quick enough and is now an UNFOX!"

  def testKeysizeInterop(self):
    s = self.input
    # test for all valid sizes
    for size in keyinfo.AES.sizes:
      # generate a new key of this size
      aeskey = keys.AesKey.Generate(size)

      # ensure PyCrypto chosen
      keys.ACTIVE_CRYPT_LIB = 'pycrypto'
      self.assertEquals(
        aeskey.Decrypt(aeskey.Encrypt(s)), s,
        'Cannot encrypt/decrypt with the same PyCrypto key! size:%s' %size
      )
      pycrypto_encrypted_str = aeskey.Encrypt(s)

      # now switch to M2Crypto
      keys.ACTIVE_CRYPT_LIB = 'm2crypto'
      self.assertEquals(
        aeskey.Decrypt(aeskey.Encrypt(s)), s,
        'Cannot encrypt/decrypt with the same M2Crypto key! size:%s' %size
      )
      m2crypto_encrypted_str = aeskey.Encrypt(s)

      self.assertEquals(
        aeskey.Decrypt(pycrypto_encrypted_str), s,
        'Cannot decrypt PyCrypto with M2Crypto key! size:%s' %size
      )
      self.assertEquals(
        aeskey.Decrypt(m2crypto_encrypted_str), s,
        'Cannot decrypt M2Crypto with M2Crypto key! size:%s' %size
      )

      # now switch to PyCrypto
      keys.ACTIVE_CRYPT_LIB = 'pycrypto'
      self.assertEquals(
        aeskey.Decrypt(pycrypto_encrypted_str), s,
        'Cannot decrypt PyCrypto with PyCrypto key! size:%s' %size
      )
      self.assertEquals(
        aeskey.Decrypt(m2crypto_encrypted_str), s,
        'Cannot decrypt M2Crypto with PyCrypto key! size:%s' %size
      )

class CreateReaderTest(unittest.TestCase):

  def testDefaultCreateReaderIgnoresUnsupported(self):
    self.assertRaises(errors.KeyczarError, readers.CreateReader,
                      'foo://this.is.unsupported')

  def testFileReaderDoesNotOpenNonExistantLocation(self):
    location = '/tmp/12312j3l'
    self.assertFalse(os.path.isdir(location))
    self.assertRaises(errors.KeyczarError, readers.CreateReader,
                      location)

  def testFileReaderOpensExistingLocation(self):
    location = os.path.join(TEST_DATA, 'aes')
    self.assertTrue(os.path.isdir(location))
    rdr = readers.CreateReader(location)
    self.assertTrue(rdr is not None)
    self.assertTrue(isinstance(rdr, readers.FileReader))

  def testCreateReaderDirectInstantiation(self):
    """
    Only FileReader can be instantiated by CreateReader.
    All others in readers.py must be instantiated via their constructor.
    """
    location = os.path.join(TEST_DATA, 'aes')
    self.assertTrue(os.path.isdir(location))
    # make sure all readers are available
    util.ImportBackends()
    # Check all non-FileReaders
    for sc in readers.Reader.__subclasses__():
      if not issubclass(sc, readers.FileReader):
        self.assertTrue(sc.CreateReader(location) is None)

  def testNewLocationTypeSupported(self):
    class BarReader(readers.Reader):
      def GetMetadata(self):
        return

      def GetKey(self, version_number):
        return

      def Close(self):
        return

      @classmethod
      def CreateReader(cls, location):
        if location.startswith('bar:'):
          return BarReader()

    rdr = readers.CreateReader('bar://this.should.be.created')
    self.assertTrue(isinstance(rdr, BarReader))

class CreateWriterTest(unittest.TestCase):

  def testDefaultCreateWriterIgnoresUnsupported(self):
    self.assertRaises(errors.KeyczarError, writers.CreateWriter,
                      'foo://this.is.unsupported')

  def testFileWriterDoesNotOpenNonExistantLocation(self):
    location = '/tmp/12312j3l'
    self.assertFalse(os.path.isdir(location))
    self.assertRaises(errors.KeyczarError, writers.CreateWriter,
                      location)

  def testFileWriterOpensExistingLocation(self):
    location = os.path.join(TEST_DATA, 'aes')
    self.assertTrue(os.path.isdir(location))
    wrtr = writers.CreateWriter(location)
    self.assertTrue(wrtr is not None)
    self.assertTrue(isinstance(wrtr, writers.FileWriter))

  def testCreateWriterDirectInstantiation(self):
    """
    Only FileWriter can be instantiated by CreateWriter.
    All others in writers.py must be instantiated via their constructor.
    """
    location = os.path.join(TEST_DATA, 'aes')
    self.assertTrue(os.path.isdir(location))
    # make sure all writers are available
    util.ImportBackends()
    # Check all non-FileWriters
    for sc in writers.Writer.__subclasses__():
      if not issubclass(sc, writers.FileWriter):
        self.assertIsNone(sc.CreateWriter(location))

  def testNewLocationTypeSupported(self):
    class BarWriter(writers.Writer):
      def WriteMetadata(self, metadata, overwrite=True):
        return

      def WriteKey(self, key, version_number, encrypter=None):
        return

      def Remove(self, version_number):
        return

      def Close(self):
        return

      @classmethod
      def CreateWriter(cls, location):
        if location.startswith('bar:'):
          return BarWriter()

    wrtr = writers.CreateWriter('bar://this.should.be.created')
    self.assertTrue(isinstance(wrtr, BarWriter))

def suite():
  alltests = unittest.TestSuite(
    [unittest.TestLoader().loadTestsFromTestCase(PyCryptoCrypterTest),
     unittest.TestLoader().loadTestsFromTestCase(M2CryptoCrypterTest),
     unittest.TestLoader().loadTestsFromTestCase(PyCryptoM2CryptoInteropTest),
     unittest.TestLoader().loadTestsFromTestCase(CreateReaderTest),
     unittest.TestLoader().loadTestsFromTestCase(CreateWriterTest)
    ])

  return alltests

if __name__ == "__main__":
  unittest.main(defaultTest='suite')