from __future__ import division, absolute_import, print_function
import sys
import gzip
import os
import threading
import time
import warnings
import io
import re
import pytest
from tempfile import NamedTemporaryFile
from io import BytesIO, StringIO
from datetime import datetime
import locale
import numpy as np
import numpy.ma as ma
from numpy.lib._iotools import ConverterError, ConversionWarning
from numpy.compat import asbytes, bytes, Path
from numpy.ma.testutils import assert_equal
from numpy.testing import (
assert_warns, assert_, assert_raises_regex, assert_raises,
assert_allclose, assert_array_equal, temppath, tempdir, IS_PYPY,
HAS_REFCOUNT, suppress_warnings, assert_no_gc_cycles,
)
class TextIO(BytesIO):
"""Helper IO class.
Writes encode strings to bytes if needed, reads return bytes.
This makes it easier to emulate files opened in binary mode
without needing to explicitly convert strings to bytes in
setting up the test data.
"""
def __init__(self, s=""):
BytesIO.__init__(self, asbytes(s))
def write(self, s):
BytesIO.write(self, asbytes(s))
def writelines(self, lines):
BytesIO.writelines(self, [asbytes(s) for s in lines])
MAJVER, MINVER = sys.version_info[:2]
IS_64BIT = sys.maxsize > 2**32
try:
import bz2
HAS_BZ2 = True
except ImportError:
HAS_BZ2 = False
try:
import lzma
HAS_LZMA = True
except ImportError:
HAS_LZMA = False
def strptime(s, fmt=None):
"""
This function is available in the datetime module only from Python >=
2.5.
"""
if type(s) == bytes:
s = s.decode("latin1")
return datetime(*time.strptime(s, fmt)[:3])
class RoundtripTest(object):
def roundtrip(self, save_func, *args, **kwargs):
"""
save_func : callable
Function used to save arrays to file.
file_on_disk : bool
If true, store the file on disk, instead of in a
string buffer.
save_kwds : dict
Parameters passed to `save_func`.
load_kwds : dict
Parameters passed to `numpy.load`.
args : tuple of arrays
Arrays stored to file.
"""
save_kwds = kwargs.get('save_kwds', {})
load_kwds = kwargs.get('load_kwds', {})
file_on_disk = kwargs.get('file_on_disk', False)
if file_on_disk:
target_file = NamedTemporaryFile(delete=False)
load_file = target_file.name
else:
target_file = BytesIO()
load_file = target_file
try:
arr = args
save_func(target_file, *arr, **save_kwds)
target_file.flush()
target_file.seek(0)
if sys.platform == 'win32' and not isinstance(target_file, BytesIO):
target_file.close()
arr_reloaded = np.load(load_file, **load_kwds)
self.arr = arr
self.arr_reloaded = arr_reloaded
finally:
if not isinstance(target_file, BytesIO):
target_file.close()
# holds an open file descriptor so it can't be deleted on win
if 'arr_reloaded' in locals():
if not isinstance(arr_reloaded, np.lib.npyio.NpzFile):
os.remove(target_file.name)
def check_roundtrips(self, a):
self.roundtrip(a)
self.roundtrip(a, file_on_disk=True)
self.roundtrip(np.asfortranarray(a))
self.roundtrip(np.asfortranarray(a), file_on_disk=True)
if a.shape[0] > 1:
# neither C nor Fortran contiguous for 2D arrays or more
self.roundtrip(np.asfortranarray(a)[1:])
self.roundtrip(np.asfortranarray(a)[1:], file_on_disk=True)
def test_array(self):
a = np.array([], float)
self.check_roundtrips(a)
a = np.array([[1, 2], [3, 4]], float)
self.check_roundtrips(a)
a = np.array([[1, 2], [3, 4]], int)
self.check_roundtrips(a)
a = np.array([[1 + 5j, 2 + 6j], [3 + 7j, 4 + 8j]], dtype=np.csingle)
self.check_roundtrips(a)
a = np.array([[1 + 5j, 2 + 6j], [3 + 7j, 4 + 8j]], dtype=np.cdouble)
self.check_roundtrips(a)
def test_array_object(self):
a = np.array([], object)
self.check_roundtrips(a)
a = np.array([[1, 2], [3, 4]], object)
self.check_roundtrips(a)
def test_1D(self):
a = np.array([1, 2, 3, 4], int)
self.roundtrip(a)
@pytest.mark.skipif(sys.platform == 'win32', reason="Fails on Win32")
def test_mmap(self):
a = np.array([[1, 2.5], [4, 7.3]])
self.roundtrip(a, file_on_disk=True, load_kwds={'mmap_mode': 'r'})
a = np.asfortranarray([[1, 2.5], [4, 7.3]])
self.roundtrip(a, file_on_disk=True, load_kwds={'mmap_mode': 'r'})
def test_record(self):
a = np.array([(1, 2), (3, 4)], dtype=[('x', 'i4'), ('y', 'i4')])
self.check_roundtrips(a)
@pytest.mark.slow
def test_format_2_0(self):
dt = [(("%d" % i) * 100, float) for i in range(500)]
a = np.ones(1000, dtype=dt)
with warnings.catch_warnings(record=True):
warnings.filterwarnings('always', '', UserWarning)
self.check_roundtrips(a)
class TestSaveLoad(RoundtripTest):
def roundtrip(self, *args, **kwargs):
RoundtripTest.roundtrip(self, np.save, *args, **kwargs)
assert_equal(self.arr[0], self.arr_reloaded)
assert_equal(self.arr[0].dtype, self.arr_reloaded.dtype)
assert_equal(self.arr[0].flags.fnc, self.arr_reloaded.flags.fnc)
class TestSavezLoad(RoundtripTest):
def roundtrip(self, *args, **kwargs):
RoundtripTest.roundtrip(self, np.savez, *args, **kwargs)
try:
for n, arr in enumerate(self.arr):
reloaded = self.arr_reloaded['arr_%d' % n]
assert_equal(arr, reloaded)
assert_equal(arr.dtype, reloaded.dtype)
assert_equal(arr.flags.fnc, reloaded.flags.fnc)
finally:
# delete tempfile, must be done here on windows
if self.arr_reloaded.fid:
self.arr_reloaded.fid.close()
os.remove(self.arr_reloaded.fid.name)
@pytest.mark.skipif(not IS_64BIT, reason="Needs 64bit platform")
@pytest.mark.slow
def test_big_arrays(self):
L = (1 << 31) + 100000
a = np.empty(L, dtype=np.uint8)
with temppath(prefix="numpy_test_big_arrays_", suffix=".npz") as tmp:
np.savez(tmp, a=a)
del a
npfile = np.load(tmp)
a = npfile['a'] # Should succeed
npfile.close()
del a # Avoid pyflakes unused variable warning.
def test_multiple_arrays(self):
a = np.array([[1, 2], [3, 4]], float)
b = np.array([[1 + 2j, 2 + 7j], [3 - 6j, 4 + 12j]], complex)
self.roundtrip(a, b)
def test_named_arrays(self):
a = np.array([[1, 2], [3, 4]], float)
b = np.array([[1 + 2j, 2 + 7j], [3 - 6j, 4 + 12j]], complex)
c = BytesIO()
np.savez(c, file_a=a, file_b=b)
c.seek(0)
l = np.load(c)
assert_equal(a, l['file_a'])
assert_equal(b, l['file_b'])
def test_BagObj(self):
a = np.array([[1, 2], [3, 4]], float)
b = np.array([[1 + 2j, 2 + 7j], [3 - 6j, 4 + 12j]], complex)
c = BytesIO()
np.savez(c, file_a=a, file_b=b)
c.seek(0)
l = np.load(c)
assert_equal(sorted(dir(l.f)), ['file_a','file_b'])
assert_equal(a, l.f.file_a)
assert_equal(b, l.f.file_b)
def test_savez_filename_clashes(self):
# Test that issue #852 is fixed
# and savez functions in multithreaded environment
def writer(error_list):
with temppath(suffix='.npz') as tmp:
arr = np.random.randn(500, 500)
try:
np.savez(tmp, arr=arr)
except OSError as err:
error_list.append(err)
errors = []
threads = [threading.Thread(target=writer, args=(errors,))
for j in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
if errors:
raise AssertionError(errors)
def test_not_closing_opened_fid(self):
# Test that issue #2178 is fixed:
# verify could seek on 'loaded' file
with temppath(suffix='.npz') as tmp:
with open(tmp, 'wb') as fp:
np.savez(fp, data='LOVELY LOAD')
with open(tmp, 'rb', 10000) as fp:
fp.seek(0)
assert_(not fp.closed)
np.load(fp)['data']
# fp must not get closed by .load
assert_(not fp.closed)
fp.seek(0)
assert_(not fp.closed)
#FIXME: Is this still true?
@pytest.mark.skipif(IS_PYPY, reason="Missing context manager on PyPy")
def test_closing_fid(self):
# Test that issue #1517 (too many opened files) remains closed
# It might be a "weak" test since failed to get triggered on
# e.g. Debian sid of 2012 Jul 05 but was reported to
# trigger the failure on Ubuntu 10.04:
# http://projects.scipy.org/numpy/ticket/1517#comment:2
with temppath(suffix='.npz') as tmp:
np.savez(tmp, data='LOVELY LOAD')
# We need to check if the garbage collector can properly close
# numpy npz file returned by np.load when their reference count
# goes to zero. Python 3 running in debug mode raises a
# ResourceWarning when file closing is left to the garbage
# collector, so we catch the warnings. Because ResourceWarning
# is unknown in Python < 3.x, we take the easy way out and
# catch all warnings.
with suppress_warnings() as sup:
sup.filter(Warning) # TODO: specify exact message
for i in range(1, 1025):
try:
np.load(tmp)["data"]
except Exception as e:
msg = "Failed to load data from a file: %s" % e
raise AssertionError(msg)
def test_closing_zipfile_after_load(self):
# Check that zipfile owns file and can close it. This needs to
# pass a file name to load for the test. On windows failure will
# cause a second error will be raised when the attempt to remove
# the open file is made.
prefix = 'numpy_test_closing_zipfile_after_load_'
with temppath(suffix='.npz', prefix=prefix) as tmp:
np.savez(tmp, lab='place holder')
data = np.load(tmp)
fp = data.zip.fp
data.close()
assert_(fp.closed)
class TestSaveTxt(object):
def test_array(self):
a = np.array([[1, 2], [3, 4]], float)
fmt = "%.18e"
c = BytesIO()
np.savetxt(c, a, fmt=fmt)
c.seek(0)
assert_equal(c.readlines(),
[asbytes((fmt + ' ' + fmt + '\n') % (1, 2)),
asbytes((fmt + ' ' + fmt + '\n') % (3, 4))])
a = np.array([[1, 2], [3, 4]], int)
c = BytesIO()
np.savetxt(c, a, fmt='%d')
c.seek(0)
assert_equal(c.readlines(), [b'1 2\n', b'3 4\n'])
def test_1D(self):
a = np.array([1, 2, 3, 4], int)
c = BytesIO()
np.savetxt(c, a, fmt='%d')
c.seek(0)
lines = c.readlines()
assert_equal(lines, [b'1\n', b'2\n', b'3\n', b'4\n'])
def test_0D_3D(self):
Loading ...