Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / libpython3.8-testsuite   deb

Repository URL to install this package:

Version: 3.8.5-1+stretch1 

/ usr / lib / python3.8 / test / test_lzma.py

import _compression
from io import BytesIO, UnsupportedOperation, DEFAULT_BUFFER_SIZE
import os
import pathlib
import pickle
import random
import sys
from test import support
import unittest

from test.support import (
    _4G, TESTFN, import_module, bigmemtest, run_unittest, unlink
)

lzma = import_module("lzma")
from lzma import LZMACompressor, LZMADecompressor, LZMAError, LZMAFile


class CompressorDecompressorTestCase(unittest.TestCase):

    # Test error cases.

    def test_simple_bad_args(self):
        self.assertRaises(TypeError, LZMACompressor, [])
        self.assertRaises(TypeError, LZMACompressor, format=3.45)
        self.assertRaises(TypeError, LZMACompressor, check="")
        self.assertRaises(TypeError, LZMACompressor, preset="asdf")
        self.assertRaises(TypeError, LZMACompressor, filters=3)
        # Can't specify FORMAT_AUTO when compressing.
        self.assertRaises(ValueError, LZMACompressor, format=lzma.FORMAT_AUTO)
        # Can't specify a preset and a custom filter chain at the same time.
        with self.assertRaises(ValueError):
            LZMACompressor(preset=7, filters=[{"id": lzma.FILTER_LZMA2}])

        self.assertRaises(TypeError, LZMADecompressor, ())
        self.assertRaises(TypeError, LZMADecompressor, memlimit=b"qw")
        with self.assertRaises(TypeError):
            LZMADecompressor(lzma.FORMAT_RAW, filters="zzz")
        # Cannot specify a memory limit with FILTER_RAW.
        with self.assertRaises(ValueError):
            LZMADecompressor(lzma.FORMAT_RAW, memlimit=0x1000000)
        # Can only specify a custom filter chain with FILTER_RAW.
        self.assertRaises(ValueError, LZMADecompressor, filters=FILTERS_RAW_1)
        with self.assertRaises(ValueError):
            LZMADecompressor(format=lzma.FORMAT_XZ, filters=FILTERS_RAW_1)
        with self.assertRaises(ValueError):
            LZMADecompressor(format=lzma.FORMAT_ALONE, filters=FILTERS_RAW_1)

        lzc = LZMACompressor()
        self.assertRaises(TypeError, lzc.compress)
        self.assertRaises(TypeError, lzc.compress, b"foo", b"bar")
        self.assertRaises(TypeError, lzc.flush, b"blah")
        empty = lzc.flush()
        self.assertRaises(ValueError, lzc.compress, b"quux")
        self.assertRaises(ValueError, lzc.flush)

        lzd = LZMADecompressor()
        self.assertRaises(TypeError, lzd.decompress)
        self.assertRaises(TypeError, lzd.decompress, b"foo", b"bar")
        lzd.decompress(empty)
        self.assertRaises(EOFError, lzd.decompress, b"quux")

    def test_bad_filter_spec(self):
        self.assertRaises(TypeError, LZMACompressor, filters=[b"wobsite"])
        self.assertRaises(ValueError, LZMACompressor, filters=[{"xyzzy": 3}])
        self.assertRaises(ValueError, LZMACompressor, filters=[{"id": 98765}])
        with self.assertRaises(ValueError):
            LZMACompressor(filters=[{"id": lzma.FILTER_LZMA2, "foo": 0}])
        with self.assertRaises(ValueError):
            LZMACompressor(filters=[{"id": lzma.FILTER_DELTA, "foo": 0}])
        with self.assertRaises(ValueError):
            LZMACompressor(filters=[{"id": lzma.FILTER_X86, "foo": 0}])

    def test_decompressor_after_eof(self):
        lzd = LZMADecompressor()
        lzd.decompress(COMPRESSED_XZ)
        self.assertRaises(EOFError, lzd.decompress, b"nyan")

    def test_decompressor_memlimit(self):
        lzd = LZMADecompressor(memlimit=1024)
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)

        lzd = LZMADecompressor(lzma.FORMAT_XZ, memlimit=1024)
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)

        lzd = LZMADecompressor(lzma.FORMAT_ALONE, memlimit=1024)
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_ALONE)

    # Test LZMADecompressor on known-good input data.

    def _test_decompressor(self, lzd, data, check, unused_data=b""):
        self.assertFalse(lzd.eof)
        out = lzd.decompress(data)
        self.assertEqual(out, INPUT)
        self.assertEqual(lzd.check, check)
        self.assertTrue(lzd.eof)
        self.assertEqual(lzd.unused_data, unused_data)

    def test_decompressor_auto(self):
        lzd = LZMADecompressor()
        self._test_decompressor(lzd, COMPRESSED_XZ, lzma.CHECK_CRC64)

        lzd = LZMADecompressor()
        self._test_decompressor(lzd, COMPRESSED_ALONE, lzma.CHECK_NONE)

    def test_decompressor_xz(self):
        lzd = LZMADecompressor(lzma.FORMAT_XZ)
        self._test_decompressor(lzd, COMPRESSED_XZ, lzma.CHECK_CRC64)

    def test_decompressor_alone(self):
        lzd = LZMADecompressor(lzma.FORMAT_ALONE)
        self._test_decompressor(lzd, COMPRESSED_ALONE, lzma.CHECK_NONE)

    def test_decompressor_raw_1(self):
        lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_1)
        self._test_decompressor(lzd, COMPRESSED_RAW_1, lzma.CHECK_NONE)

    def test_decompressor_raw_2(self):
        lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_2)
        self._test_decompressor(lzd, COMPRESSED_RAW_2, lzma.CHECK_NONE)

    def test_decompressor_raw_3(self):
        lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_3)
        self._test_decompressor(lzd, COMPRESSED_RAW_3, lzma.CHECK_NONE)

    def test_decompressor_raw_4(self):
        lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_4)
        self._test_decompressor(lzd, COMPRESSED_RAW_4, lzma.CHECK_NONE)

    def test_decompressor_chunks(self):
        lzd = LZMADecompressor()
        out = []
        for i in range(0, len(COMPRESSED_XZ), 10):
            self.assertFalse(lzd.eof)
            out.append(lzd.decompress(COMPRESSED_XZ[i:i+10]))
        out = b"".join(out)
        self.assertEqual(out, INPUT)
        self.assertEqual(lzd.check, lzma.CHECK_CRC64)
        self.assertTrue(lzd.eof)
        self.assertEqual(lzd.unused_data, b"")

    def test_decompressor_chunks_empty(self):
        lzd = LZMADecompressor()
        out = []
        for i in range(0, len(COMPRESSED_XZ), 10):
            self.assertFalse(lzd.eof)
            out.append(lzd.decompress(b''))
            out.append(lzd.decompress(b''))
            out.append(lzd.decompress(b''))
            out.append(lzd.decompress(COMPRESSED_XZ[i:i+10]))
        out = b"".join(out)
        self.assertEqual(out, INPUT)
        self.assertEqual(lzd.check, lzma.CHECK_CRC64)
        self.assertTrue(lzd.eof)
        self.assertEqual(lzd.unused_data, b"")

    def test_decompressor_chunks_maxsize(self):
        lzd = LZMADecompressor()
        max_length = 100
        out = []

        # Feed first half the input
        len_ = len(COMPRESSED_XZ) // 2
        out.append(lzd.decompress(COMPRESSED_XZ[:len_],
                                  max_length=max_length))
        self.assertFalse(lzd.needs_input)
        self.assertEqual(len(out[-1]), max_length)

        # Retrieve more data without providing more input
        out.append(lzd.decompress(b'', max_length=max_length))
        self.assertFalse(lzd.needs_input)
        self.assertEqual(len(out[-1]), max_length)

        # Retrieve more data while providing more input
        out.append(lzd.decompress(COMPRESSED_XZ[len_:],
                                  max_length=max_length))
        self.assertLessEqual(len(out[-1]), max_length)

        # Retrieve remaining uncompressed data
        while not lzd.eof:
            out.append(lzd.decompress(b'', max_length=max_length))
            self.assertLessEqual(len(out[-1]), max_length)

        out = b"".join(out)
        self.assertEqual(out, INPUT)
        self.assertEqual(lzd.check, lzma.CHECK_CRC64)
        self.assertEqual(lzd.unused_data, b"")

    def test_decompressor_inputbuf_1(self):
        # Test reusing input buffer after moving existing
        # contents to beginning
        lzd = LZMADecompressor()
        out = []

        # Create input buffer and fill it
        self.assertEqual(lzd.decompress(COMPRESSED_XZ[:100],
                                        max_length=0), b'')

        # Retrieve some results, freeing capacity at beginning
        # of input buffer
        out.append(lzd.decompress(b'', 2))

        # Add more data that fits into input buffer after
        # moving existing data to beginning
        out.append(lzd.decompress(COMPRESSED_XZ[100:105], 15))

        # Decompress rest of data
        out.append(lzd.decompress(COMPRESSED_XZ[105:]))
        self.assertEqual(b''.join(out), INPUT)

    def test_decompressor_inputbuf_2(self):
        # Test reusing input buffer by appending data at the
        # end right away
        lzd = LZMADecompressor()
        out = []

        # Create input buffer and empty it
        self.assertEqual(lzd.decompress(COMPRESSED_XZ[:200],
                                        max_length=0), b'')
        out.append(lzd.decompress(b''))

        # Fill buffer with new data
        out.append(lzd.decompress(COMPRESSED_XZ[200:280], 2))

        # Append some more data, not enough to require resize
        out.append(lzd.decompress(COMPRESSED_XZ[280:300], 2))

        # Decompress rest of data
        out.append(lzd.decompress(COMPRESSED_XZ[300:]))
        self.assertEqual(b''.join(out), INPUT)

    def test_decompressor_inputbuf_3(self):
        # Test reusing input buffer after extending it

        lzd = LZMADecompressor()
        out = []

        # Create almost full input buffer
        out.append(lzd.decompress(COMPRESSED_XZ[:200], 5))

        # Add even more data to it, requiring resize
        out.append(lzd.decompress(COMPRESSED_XZ[200:300], 5))

        # Decompress rest of data
        out.append(lzd.decompress(COMPRESSED_XZ[300:]))
        self.assertEqual(b''.join(out), INPUT)

    def test_decompressor_unused_data(self):
        lzd = LZMADecompressor()
        extra = b"fooblibar"
        self._test_decompressor(lzd, COMPRESSED_XZ + extra, lzma.CHECK_CRC64,
                                unused_data=extra)

    def test_decompressor_bad_input(self):
        lzd = LZMADecompressor()
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_RAW_1)

        lzd = LZMADecompressor(lzma.FORMAT_XZ)
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_ALONE)

        lzd = LZMADecompressor(lzma.FORMAT_ALONE)
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)

        lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_1)
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_XZ)

    def test_decompressor_bug_28275(self):
        # Test coverage for Issue 28275
        lzd = LZMADecompressor()
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_RAW_1)
        # Previously, a second call could crash due to internal inconsistency
        self.assertRaises(LZMAError, lzd.decompress, COMPRESSED_RAW_1)

    # Test that LZMACompressor->LZMADecompressor preserves the input data.

    def test_roundtrip_xz(self):
        lzc = LZMACompressor()
        cdata = lzc.compress(INPUT) + lzc.flush()
        lzd = LZMADecompressor()
        self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)

    def test_roundtrip_alone(self):
        lzc = LZMACompressor(lzma.FORMAT_ALONE)
        cdata = lzc.compress(INPUT) + lzc.flush()
        lzd = LZMADecompressor()
        self._test_decompressor(lzd, cdata, lzma.CHECK_NONE)

    def test_roundtrip_raw(self):
        lzc = LZMACompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_4)
        cdata = lzc.compress(INPUT) + lzc.flush()
        lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_4)
        self._test_decompressor(lzd, cdata, lzma.CHECK_NONE)

    def test_roundtrip_raw_empty(self):
        lzc = LZMACompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_4)
        cdata = lzc.compress(INPUT)
        cdata += lzc.compress(b'')
        cdata += lzc.compress(b'')
        cdata += lzc.compress(b'')
        cdata += lzc.flush()
        lzd = LZMADecompressor(lzma.FORMAT_RAW, filters=FILTERS_RAW_4)
        self._test_decompressor(lzd, cdata, lzma.CHECK_NONE)

    def test_roundtrip_chunks(self):
        lzc = LZMACompressor()
        cdata = []
        for i in range(0, len(INPUT), 10):
            cdata.append(lzc.compress(INPUT[i:i+10]))
        cdata.append(lzc.flush())
        cdata = b"".join(cdata)
        lzd = LZMADecompressor()
        self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)

    def test_roundtrip_empty_chunks(self):
        lzc = LZMACompressor()
        cdata = []
        for i in range(0, len(INPUT), 10):
            cdata.append(lzc.compress(INPUT[i:i+10]))
            cdata.append(lzc.compress(b''))
            cdata.append(lzc.compress(b''))
            cdata.append(lzc.compress(b''))
        cdata.append(lzc.flush())
        cdata = b"".join(cdata)
        lzd = LZMADecompressor()
        self._test_decompressor(lzd, cdata, lzma.CHECK_CRC64)

    # LZMADecompressor intentionally does not handle concatenated streams.

    def test_decompressor_multistream(self):
        lzd = LZMADecompressor()
        self._test_decompressor(lzd, COMPRESSED_XZ + COMPRESSED_ALONE,
                                lzma.CHECK_CRC64, unused_data=COMPRESSED_ALONE)

    # Test with inputs larger than 4GiB.

    @support.skip_if_pgo_task
    @bigmemtest(size=_4G + 100, memuse=2)
    def test_compressor_bigmem(self, size):
        lzc = LZMACompressor()
        cdata = lzc.compress(b"x" * size) + lzc.flush()
        ddata = lzma.decompress(cdata)
        try:
            self.assertEqual(len(ddata), size)
            self.assertEqual(len(ddata.strip(b"x")), 0)
        finally:
Loading ...