from contextlib import contextmanager
import datetime
from datetime import timedelta
from distutils.version import LooseVersion
import os
import tempfile
from warnings import catch_warnings, simplefilter
import numpy as np
import pytest
from pandas.compat import (
PY35, PY36, BytesIO, is_platform_little_endian, is_platform_windows,
lrange, range, text_type, u)
import pandas.util._test_decorators as td
from pandas.core.dtypes.common import is_categorical_dtype
import pandas as pd
from pandas import (
Categorical, DataFrame, DatetimeIndex, Index, Int64Index, MultiIndex,
Panel, RangeIndex, Series, Timestamp, bdate_range, compat, concat,
date_range, isna, timedelta_range)
import pandas.util.testing as tm
from pandas.util.testing import (
assert_frame_equal, assert_panel_equal, assert_series_equal, set_timezone)
from pandas.io import pytables as pytables # noqa:E402
from pandas.io.formats.printing import pprint_thing
from pandas.io.pytables import (
ClosedFileError, HDFStore, PossibleDataLossError, Term, read_hdf)
from pandas.io.pytables import TableIterator # noqa:E402
tables = pytest.importorskip('tables')
# TODO:
# remove when gh-24839 is fixed; this affects numpy 1.16
# and pytables 3.4.4
xfail_non_writeable = pytest.mark.xfail(
LooseVersion(np.__version__) >= LooseVersion('1.16'),
reason=('gh-25511, gh-24839. pytables needs a '
'release beyong 3.4.4 to support numpy 1.16x'))
_default_compressor = ('blosc' if LooseVersion(tables.__version__) >=
LooseVersion('2.2') else 'zlib')
ignore_natural_naming_warning = pytest.mark.filterwarnings(
"ignore:object name:tables.exceptions.NaturalNameWarning"
)
# contextmanager to ensure the file cleanup
def safe_remove(path):
if path is not None:
try:
os.remove(path)
except OSError:
pass
def safe_close(store):
try:
if store is not None:
store.close()
except IOError:
pass
def create_tempfile(path):
""" create an unopened named temporary file """
return os.path.join(tempfile.gettempdir(), path)
@contextmanager
def ensure_clean_store(path, mode='a', complevel=None, complib=None,
fletcher32=False):
try:
# put in the temporary path if we don't have one already
if not len(os.path.dirname(path)):
path = create_tempfile(path)
store = HDFStore(path, mode=mode, complevel=complevel,
complib=complib, fletcher32=False)
yield store
finally:
safe_close(store)
if mode == 'w' or mode == 'a':
safe_remove(path)
@contextmanager
def ensure_clean_path(path):
"""
return essentially a named temporary file that is not opened
and deleted on existing; if path is a list, then create and
return list of filenames
"""
try:
if isinstance(path, list):
filenames = [create_tempfile(p) for p in path]
yield filenames
else:
filenames = [create_tempfile(path)]
yield filenames[0]
finally:
for f in filenames:
safe_remove(f)
# set these parameters so we don't have file sharing
tables.parameters.MAX_NUMEXPR_THREADS = 1
tables.parameters.MAX_BLOSC_THREADS = 1
tables.parameters.MAX_THREADS = 1
def _maybe_remove(store, key):
"""For tests using tables, try removing the table to be sure there is
no content from previous tests using the same table name."""
try:
store.remove(key)
except (ValueError, KeyError):
pass
class Base(object):
@classmethod
def setup_class(cls):
# Pytables 3.0.0 deprecates lots of things
tm.reset_testing_mode()
@classmethod
def teardown_class(cls):
# Pytables 3.0.0 deprecates lots of things
tm.set_testing_mode()
def setup_method(self, method):
self.path = 'tmp.__%s__.h5' % tm.rands(10)
def teardown_method(self, method):
pass
@pytest.mark.single
@pytest.mark.filterwarnings("ignore:\\nPanel:FutureWarning")
class TestHDFStore(Base):
def test_format_kwarg_in_constructor(self):
# GH 13291
with ensure_clean_path(self.path) as path:
pytest.raises(ValueError, HDFStore, path, format='table')
def test_context(self):
path = create_tempfile(self.path)
try:
with HDFStore(path) as tbl:
raise ValueError('blah')
except ValueError:
pass
finally:
safe_remove(path)
try:
with HDFStore(path) as tbl:
tbl['a'] = tm.makeDataFrame()
with HDFStore(path) as tbl:
assert len(tbl) == 1
assert type(tbl['a']) == DataFrame
finally:
safe_remove(path)
def test_conv_read_write(self):
path = create_tempfile(self.path)
try:
def roundtrip(key, obj, **kwargs):
obj.to_hdf(path, key, **kwargs)
return read_hdf(path, key)
o = tm.makeTimeSeries()
assert_series_equal(o, roundtrip('series', o))
o = tm.makeStringSeries()
assert_series_equal(o, roundtrip('string_series', o))
o = tm.makeDataFrame()
assert_frame_equal(o, roundtrip('frame', o))
with catch_warnings(record=True):
o = tm.makePanel()
assert_panel_equal(o, roundtrip('panel', o))
# table
df = DataFrame(dict(A=lrange(5), B=lrange(5)))
df.to_hdf(path, 'table', append=True)
result = read_hdf(path, 'table', where=['index>2'])
assert_frame_equal(df[df.index > 2], result)
finally:
safe_remove(path)
def test_long_strings(self):
# GH6166
df = DataFrame({'a': tm.rands_array(100, size=10)},
index=tm.rands_array(100, size=10))
with ensure_clean_store(self.path) as store:
store.append('df', df, data_columns=['a'])
result = store.select('df')
assert_frame_equal(df, result)
def test_api(self):
# GH4584
# API issue when to_hdf doesn't acdept append AND format args
with ensure_clean_path(self.path) as path:
df = tm.makeDataFrame()
df.iloc[:10].to_hdf(path, 'df', append=True, format='table')
df.iloc[10:].to_hdf(path, 'df', append=True, format='table')
assert_frame_equal(read_hdf(path, 'df'), df)
# append to False
df.iloc[:10].to_hdf(path, 'df', append=False, format='table')
df.iloc[10:].to_hdf(path, 'df', append=True, format='table')
assert_frame_equal(read_hdf(path, 'df'), df)
with ensure_clean_path(self.path) as path:
df = tm.makeDataFrame()
df.iloc[:10].to_hdf(path, 'df', append=True)
df.iloc[10:].to_hdf(path, 'df', append=True, format='table')
assert_frame_equal(read_hdf(path, 'df'), df)
# append to False
df.iloc[:10].to_hdf(path, 'df', append=False, format='table')
df.iloc[10:].to_hdf(path, 'df', append=True)
assert_frame_equal(read_hdf(path, 'df'), df)
with ensure_clean_path(self.path) as path:
df = tm.makeDataFrame()
df.to_hdf(path, 'df', append=False, format='fixed')
assert_frame_equal(read_hdf(path, 'df'), df)
df.to_hdf(path, 'df', append=False, format='f')
assert_frame_equal(read_hdf(path, 'df'), df)
df.to_hdf(path, 'df', append=False)
assert_frame_equal(read_hdf(path, 'df'), df)
df.to_hdf(path, 'df')
assert_frame_equal(read_hdf(path, 'df'), df)
with ensure_clean_store(self.path) as store:
path = store._path
df = tm.makeDataFrame()
_maybe_remove(store, 'df')
store.append('df', df.iloc[:10], append=True, format='table')
store.append('df', df.iloc[10:], append=True, format='table')
assert_frame_equal(store.select('df'), df)
# append to False
_maybe_remove(store, 'df')
store.append('df', df.iloc[:10], append=False, format='table')
store.append('df', df.iloc[10:], append=True, format='table')
assert_frame_equal(store.select('df'), df)
# formats
_maybe_remove(store, 'df')
store.append('df', df.iloc[:10], append=False, format='table')
store.append('df', df.iloc[10:], append=True, format='table')
assert_frame_equal(store.select('df'), df)
_maybe_remove(store, 'df')
store.append('df', df.iloc[:10], append=False, format='table')
store.append('df', df.iloc[10:], append=True, format=None)
assert_frame_equal(store.select('df'), df)
with ensure_clean_path(self.path) as path:
# invalid
df = tm.makeDataFrame()
pytest.raises(ValueError, df.to_hdf, path,
'df', append=True, format='f')
pytest.raises(ValueError, df.to_hdf, path,
'df', append=True, format='fixed')
pytest.raises(TypeError, df.to_hdf, path,
'df', append=True, format='foo')
pytest.raises(TypeError, df.to_hdf, path,
'df', append=False, format='bar')
# File path doesn't exist
path = ""
pytest.raises(compat.FileNotFoundError,
read_hdf, path, 'df')
def test_api_default_format(self):
# default_format option
with ensure_clean_store(self.path) as store:
df = tm.makeDataFrame()
pd.set_option('io.hdf.default_format', 'fixed')
_maybe_remove(store, 'df')
store.put('df', df)
assert not store.get_storer('df').is_table
pytest.raises(ValueError, store.append, 'df2', df)
pd.set_option('io.hdf.default_format', 'table')
_maybe_remove(store, 'df')
store.put('df', df)
assert store.get_storer('df').is_table
_maybe_remove(store, 'df2')
store.append('df2', df)
assert store.get_storer('df').is_table
pd.set_option('io.hdf.default_format', None)
with ensure_clean_path(self.path) as path:
df = tm.makeDataFrame()
pd.set_option('io.hdf.default_format', 'fixed')
df.to_hdf(path, 'df')
with HDFStore(path) as store:
assert not store.get_storer('df').is_table
pytest.raises(ValueError, df.to_hdf, path, 'df2', append=True)
pd.set_option('io.hdf.default_format', 'table')
df.to_hdf(path, 'df3')
Loading ...