from datetime import timedelta
import re
import numpy as np
import pytest
import pandas as pd
from pandas import (
DataFrame,
Index,
Int64Index,
Series,
Timedelta,
TimedeltaIndex,
date_range,
timedelta_range,
)
import pandas.util.testing as tm
from pandas.util.testing import (
assert_almost_equal,
assert_index_equal,
assert_series_equal,
)
from ..datetimelike import DatetimeLike
randn = np.random.randn
class TestTimedeltaIndex(DatetimeLike):
_holder = TimedeltaIndex
def setup_method(self, method):
self.indices = dict(index=tm.makeTimedeltaIndex(10))
self.setup_indices()
def create_index(self):
return pd.to_timedelta(range(5), unit="d") + pd.offsets.Hour(1)
def test_numeric_compat(self):
# Dummy method to override super's version; this test is now done
# in test_arithmetic.py
pass
def test_shift(self):
pass # this is handled in test_arithmetic.py
def test_pickle_compat_construction(self):
pass
def test_fillna_timedelta(self):
# GH 11343
idx = pd.TimedeltaIndex(["1 day", pd.NaT, "3 day"])
exp = pd.TimedeltaIndex(["1 day", "2 day", "3 day"])
tm.assert_index_equal(idx.fillna(pd.Timedelta("2 day")), exp)
exp = pd.TimedeltaIndex(["1 day", "3 hour", "3 day"])
idx.fillna(pd.Timedelta("3 hour"))
exp = pd.Index(
[pd.Timedelta("1 day"), "x", pd.Timedelta("3 day")], dtype=object
)
tm.assert_index_equal(idx.fillna("x"), exp)
@pytest.mark.parametrize("sort", [None, False])
def test_difference_freq(self, sort):
# GH14323: Difference of TimedeltaIndex should not preserve frequency
index = timedelta_range("0 days", "5 days", freq="D")
other = timedelta_range("1 days", "4 days", freq="D")
expected = TimedeltaIndex(["0 days", "5 days"], freq=None)
idx_diff = index.difference(other, sort)
tm.assert_index_equal(idx_diff, expected)
tm.assert_attr_equal("freq", idx_diff, expected)
other = timedelta_range("2 days", "5 days", freq="D")
idx_diff = index.difference(other, sort)
expected = TimedeltaIndex(["0 days", "1 days"], freq=None)
tm.assert_index_equal(idx_diff, expected)
tm.assert_attr_equal("freq", idx_diff, expected)
@pytest.mark.parametrize("sort", [None, False])
def test_difference_sort(self, sort):
index = pd.TimedeltaIndex(
["5 days", "3 days", "2 days", "4 days", "1 days", "0 days"]
)
other = timedelta_range("1 days", "4 days", freq="D")
idx_diff = index.difference(other, sort)
expected = TimedeltaIndex(["5 days", "0 days"], freq=None)
if sort is None:
expected = expected.sort_values()
tm.assert_index_equal(idx_diff, expected)
tm.assert_attr_equal("freq", idx_diff, expected)
other = timedelta_range("2 days", "5 days", freq="D")
idx_diff = index.difference(other, sort)
expected = TimedeltaIndex(["1 days", "0 days"], freq=None)
if sort is None:
expected = expected.sort_values()
tm.assert_index_equal(idx_diff, expected)
tm.assert_attr_equal("freq", idx_diff, expected)
def test_isin(self):
index = tm.makeTimedeltaIndex(4)
result = index.isin(index)
assert result.all()
result = index.isin(list(index))
assert result.all()
assert_almost_equal(
index.isin([index[2], 5]), np.array([False, False, True, False])
)
def test_factorize(self):
idx1 = TimedeltaIndex(["1 day", "1 day", "2 day", "2 day", "3 day", "3 day"])
exp_arr = np.array([0, 0, 1, 1, 2, 2], dtype=np.intp)
exp_idx = TimedeltaIndex(["1 day", "2 day", "3 day"])
arr, idx = idx1.factorize()
tm.assert_numpy_array_equal(arr, exp_arr)
tm.assert_index_equal(idx, exp_idx)
arr, idx = idx1.factorize(sort=True)
tm.assert_numpy_array_equal(arr, exp_arr)
tm.assert_index_equal(idx, exp_idx)
# freq must be preserved
idx3 = timedelta_range("1 day", periods=4, freq="s")
exp_arr = np.array([0, 1, 2, 3], dtype=np.intp)
arr, idx = idx3.factorize()
tm.assert_numpy_array_equal(arr, exp_arr)
tm.assert_index_equal(idx, idx3)
def test_join_self(self, join_type):
index = timedelta_range("1 day", periods=10)
joined = index.join(index, how=join_type)
tm.assert_index_equal(index, joined)
def test_does_not_convert_mixed_integer(self):
df = tm.makeCustomDataframe(
10,
10,
data_gen_f=lambda *args, **kwargs: randn(),
r_idx_type="i",
c_idx_type="td",
)
str(df)
cols = df.columns.join(df.index, how="outer")
joined = cols.join(df.columns)
assert cols.dtype == np.dtype("O")
assert cols.dtype == joined.dtype
tm.assert_index_equal(cols, joined)
def test_sort_values(self):
idx = TimedeltaIndex(["4d", "1d", "2d"])
ordered = idx.sort_values()
assert ordered.is_monotonic
ordered = idx.sort_values(ascending=False)
assert ordered[::-1].is_monotonic
ordered, dexer = idx.sort_values(return_indexer=True)
assert ordered.is_monotonic
tm.assert_numpy_array_equal(dexer, np.array([1, 2, 0]), check_dtype=False)
ordered, dexer = idx.sort_values(return_indexer=True, ascending=False)
assert ordered[::-1].is_monotonic
tm.assert_numpy_array_equal(dexer, np.array([0, 2, 1]), check_dtype=False)
def test_get_duplicates(self):
idx = TimedeltaIndex(["1 day", "2 day", "2 day", "3 day", "3day", "4day"])
with tm.assert_produces_warning(FutureWarning):
# Deprecated - see GH20239
result = idx.get_duplicates()
ex = TimedeltaIndex(["2 day", "3day"])
tm.assert_index_equal(result, ex)
def test_argmin_argmax(self):
idx = TimedeltaIndex(["1 day 00:00:05", "1 day 00:00:01", "1 day 00:00:02"])
assert idx.argmin() == 1
assert idx.argmax() == 0
def test_misc_coverage(self):
rng = timedelta_range("1 day", periods=5)
result = rng.groupby(rng.days)
assert isinstance(list(result.values())[0][0], Timedelta)
idx = TimedeltaIndex(["3d", "1d", "2d"])
assert not idx.equals(list(idx))
non_td = Index(list("abc"))
assert not idx.equals(list(non_td))
def test_map(self):
# test_map_dictlike generally tests
rng = timedelta_range("1 day", periods=10)
f = lambda x: x.days
result = rng.map(f)
exp = Int64Index([f(x) for x in rng])
tm.assert_index_equal(result, exp)
def test_pass_TimedeltaIndex_to_index(self):
rng = timedelta_range("1 days", "10 days")
idx = Index(rng, dtype=object)
expected = Index(rng.to_pytimedelta(), dtype=object)
tm.assert_numpy_array_equal(idx.values, expected.values)
def test_pickle(self):
rng = timedelta_range("1 days", periods=10)
rng_p = tm.round_trip_pickle(rng)
tm.assert_index_equal(rng, rng_p)
def test_hash_error(self):
index = timedelta_range("1 days", periods=10)
with pytest.raises(
TypeError, match=("unhashable type: {0.__name__!r}".format(type(index)))
):
hash(index)
def test_append_join_nondatetimeindex(self):
rng = timedelta_range("1 days", periods=10)
idx = Index(["a", "b", "c", "d"])
result = rng.append(idx)
assert isinstance(result[0], Timedelta)
# it works
rng.join(idx, how="outer")
def test_append_numpy_bug_1681(self):
td = timedelta_range("1 days", "10 days", freq="2D")
a = DataFrame()
c = DataFrame({"A": "foo", "B": td}, index=td)
str(c)
result = a.append(c)
assert (result["B"] == td).all()
def test_fields(self):
rng = timedelta_range("1 days, 10:11:12.100123456", periods=2, freq="s")
tm.assert_index_equal(rng.days, Index([1, 1], dtype="int64"))
tm.assert_index_equal(
rng.seconds,
Index([10 * 3600 + 11 * 60 + 12, 10 * 3600 + 11 * 60 + 13], dtype="int64"),
)
tm.assert_index_equal(
rng.microseconds, Index([100 * 1000 + 123, 100 * 1000 + 123], dtype="int64")
)
tm.assert_index_equal(rng.nanoseconds, Index([456, 456], dtype="int64"))
msg = "'TimedeltaIndex' object has no attribute '{}'"
with pytest.raises(AttributeError, match=msg.format("hours")):
rng.hours
with pytest.raises(AttributeError, match=msg.format("minutes")):
rng.minutes
with pytest.raises(AttributeError, match=msg.format("milliseconds")):
rng.milliseconds
# with nat
s = Series(rng)
s[1] = np.nan
tm.assert_series_equal(s.dt.days, Series([1, np.nan], index=[0, 1]))
tm.assert_series_equal(
s.dt.seconds, Series([10 * 3600 + 11 * 60 + 12, np.nan], index=[0, 1])
)
# preserve name (GH15589)
rng.name = "name"
assert rng.days.name == "name"
def test_freq_conversion(self):
# doc example
# series
td = Series(date_range("20130101", periods=4)) - Series(
date_range("20121201", periods=4)
)
td[2] += timedelta(minutes=5, seconds=3)
td[3] = np.nan
result = td / np.timedelta64(1, "D")
expected = Series([31, 31, (31 * 86400 + 5 * 60 + 3) / 86400.0, np.nan])
assert_series_equal(result, expected)
result = td.astype("timedelta64[D]")
expected = Series([31, 31, 31, np.nan])
assert_series_equal(result, expected)
result = td / np.timedelta64(1, "s")
expected = Series([31 * 86400, 31 * 86400, 31 * 86400 + 5 * 60 + 3, np.nan])
assert_series_equal(result, expected)
result = td.astype("timedelta64[s]")
assert_series_equal(result, expected)
# tdi
td = TimedeltaIndex(td)
result = td / np.timedelta64(1, "D")
expected = Index([31, 31, (31 * 86400 + 5 * 60 + 3) / 86400.0, np.nan])
assert_index_equal(result, expected)
result = td.astype("timedelta64[D]")
expected = Index([31, 31, 31, np.nan])
assert_index_equal(result, expected)
result = td / np.timedelta64(1, "s")
expected = Index([31 * 86400, 31 * 86400, 31 * 86400 + 5 * 60 + 3, np.nan])
assert_index_equal(result, expected)
result = td.astype("timedelta64[s]")
assert_index_equal(result, expected)
@pytest.mark.parametrize("unit", ["Y", "y", "M"])
def test_unit_m_y_deprecated(self, unit):
with tm.assert_produces_warning(FutureWarning) as w:
TimedeltaIndex([1, 3, 7], unit)
msg = r".* units are deprecated .*"
assert re.match(msg, str(w[0].message))
class TestTimeSeries:
def test_series_box_timedelta(self):
rng = timedelta_range("1 day 1 s", periods=5, freq="h")
s = Series(rng)
assert isinstance(s[1], Timedelta)
assert isinstance(s.iat[2], Timedelta)