# -*- coding: utf-8 -*-
from __future__ import print_function
from datetime import datetime, time
import numpy as np
import pytest
from pandas.compat import product
import pandas as pd
from pandas import (
DataFrame, DatetimeIndex, Index, MultiIndex, Series, Timestamp, date_range,
period_range, to_datetime)
from pandas.tests.frame.common import TestData
import pandas.util.testing as tm
from pandas.util.testing import (
assert_frame_equal, assert_index_equal, assert_series_equal)
import pandas.tseries.offsets as offsets
@pytest.fixture(params=product([True, False], [True, False]))
def close_open_fixture(request):
return request.param
class TestDataFrameTimeSeriesMethods(TestData):
def test_diff(self):
the_diff = self.tsframe.diff(1)
assert_series_equal(the_diff['A'],
self.tsframe['A'] - self.tsframe['A'].shift(1))
# int dtype
a = 10000000000000000
b = a + 1
s = Series([a, b])
rs = DataFrame({'s': s}).diff()
assert rs.s[1] == 1
# mixed numeric
tf = self.tsframe.astype('float32')
the_diff = tf.diff(1)
assert_series_equal(the_diff['A'],
tf['A'] - tf['A'].shift(1))
# issue 10907
df = pd.DataFrame({'y': pd.Series([2]), 'z': pd.Series([3])})
df.insert(0, 'x', 1)
result = df.diff(axis=1)
expected = pd.DataFrame({'x': np.nan, 'y': pd.Series(
1), 'z': pd.Series(1)}).astype('float64')
assert_frame_equal(result, expected)
@pytest.mark.parametrize('tz', [None, 'UTC'])
def test_diff_datetime_axis0(self, tz):
# GH 18578
df = DataFrame({0: date_range('2010', freq='D', periods=2, tz=tz),
1: date_range('2010', freq='D', periods=2, tz=tz)})
result = df.diff(axis=0)
expected = DataFrame({0: pd.TimedeltaIndex(['NaT', '1 days']),
1: pd.TimedeltaIndex(['NaT', '1 days'])})
assert_frame_equal(result, expected)
@pytest.mark.parametrize('tz', [None, 'UTC'])
def test_diff_datetime_axis1(self, tz):
# GH 18578
df = DataFrame({0: date_range('2010', freq='D', periods=2, tz=tz),
1: date_range('2010', freq='D', periods=2, tz=tz)})
if tz is None:
result = df.diff(axis=1)
expected = DataFrame({0: pd.TimedeltaIndex(['NaT', 'NaT']),
1: pd.TimedeltaIndex(['0 days',
'0 days'])})
assert_frame_equal(result, expected)
else:
with pytest.raises(NotImplementedError):
result = df.diff(axis=1)
def test_diff_timedelta(self):
# GH 4533
df = DataFrame(dict(time=[Timestamp('20130101 9:01'),
Timestamp('20130101 9:02')],
value=[1.0, 2.0]))
res = df.diff()
exp = DataFrame([[pd.NaT, np.nan],
[pd.Timedelta('00:01:00'), 1]],
columns=['time', 'value'])
assert_frame_equal(res, exp)
def test_diff_mixed_dtype(self):
df = DataFrame(np.random.randn(5, 3))
df['A'] = np.array([1, 2, 3, 4, 5], dtype=object)
result = df.diff()
assert result[0].dtype == np.float64
def test_diff_neg_n(self):
rs = self.tsframe.diff(-1)
xp = self.tsframe - self.tsframe.shift(-1)
assert_frame_equal(rs, xp)
def test_diff_float_n(self):
rs = self.tsframe.diff(1.)
xp = self.tsframe.diff(1)
assert_frame_equal(rs, xp)
def test_diff_axis(self):
# GH 9727
df = DataFrame([[1., 2.], [3., 4.]])
assert_frame_equal(df.diff(axis=1), DataFrame(
[[np.nan, 1.], [np.nan, 1.]]))
assert_frame_equal(df.diff(axis=0), DataFrame(
[[np.nan, np.nan], [2., 2.]]))
def test_pct_change(self):
rs = self.tsframe.pct_change(fill_method=None)
assert_frame_equal(rs, self.tsframe / self.tsframe.shift(1) - 1)
rs = self.tsframe.pct_change(2)
filled = self.tsframe.fillna(method='pad')
assert_frame_equal(rs, filled / filled.shift(2) - 1)
rs = self.tsframe.pct_change(fill_method='bfill', limit=1)
filled = self.tsframe.fillna(method='bfill', limit=1)
assert_frame_equal(rs, filled / filled.shift(1) - 1)
rs = self.tsframe.pct_change(freq='5D')
filled = self.tsframe.fillna(method='pad')
assert_frame_equal(rs,
(filled / filled.shift(freq='5D') - 1)
.reindex_like(filled))
def test_pct_change_shift_over_nas(self):
s = Series([1., 1.5, np.nan, 2.5, 3.])
df = DataFrame({'a': s, 'b': s})
chg = df.pct_change()
expected = Series([np.nan, 0.5, 0., 2.5 / 1.5 - 1, .2])
edf = DataFrame({'a': expected, 'b': expected})
assert_frame_equal(chg, edf)
@pytest.mark.parametrize("freq, periods, fill_method, limit",
[('5B', 5, None, None),
('3B', 3, None, None),
('3B', 3, 'bfill', None),
('7B', 7, 'pad', 1),
('7B', 7, 'bfill', 3),
('14B', 14, None, None)])
def test_pct_change_periods_freq(self, freq, periods, fill_method, limit):
# GH 7292
rs_freq = self.tsframe.pct_change(freq=freq,
fill_method=fill_method,
limit=limit)
rs_periods = self.tsframe.pct_change(periods,
fill_method=fill_method,
limit=limit)
assert_frame_equal(rs_freq, rs_periods)
empty_ts = DataFrame(index=self.tsframe.index,
columns=self.tsframe.columns)
rs_freq = empty_ts.pct_change(freq=freq,
fill_method=fill_method,
limit=limit)
rs_periods = empty_ts.pct_change(periods,
fill_method=fill_method,
limit=limit)
assert_frame_equal(rs_freq, rs_periods)
def test_frame_ctor_datetime64_column(self):
rng = date_range('1/1/2000 00:00:00', '1/1/2000 1:59:50', freq='10s')
dates = np.asarray(rng)
df = DataFrame({'A': np.random.randn(len(rng)), 'B': dates})
assert np.issubdtype(df['B'].dtype, np.dtype('M8[ns]'))
def test_frame_append_datetime64_column(self):
rng = date_range('1/1/2000 00:00:00', '1/1/2000 1:59:50', freq='10s')
df = DataFrame(index=np.arange(len(rng)))
df['A'] = rng
assert np.issubdtype(df['A'].dtype, np.dtype('M8[ns]'))
def test_frame_datetime64_pre1900_repr(self):
df = DataFrame({'year': date_range('1/1/1700', periods=50,
freq='A-DEC')})
# it works!
repr(df)
def test_frame_append_datetime64_col_other_units(self):
n = 100
units = ['h', 'm', 's', 'ms', 'D', 'M', 'Y']
ns_dtype = np.dtype('M8[ns]')
for unit in units:
dtype = np.dtype('M8[%s]' % unit)
vals = np.arange(n, dtype=np.int64).view(dtype)
df = DataFrame({'ints': np.arange(n)}, index=np.arange(n))
df[unit] = vals
ex_vals = to_datetime(vals.astype('O')).values
assert df[unit].dtype == ns_dtype
assert (df[unit].values == ex_vals).all()
# Test insertion into existing datetime64 column
df = DataFrame({'ints': np.arange(n)}, index=np.arange(n))
df['dates'] = np.arange(n, dtype=np.int64).view(ns_dtype)
for unit in units:
dtype = np.dtype('M8[%s]' % unit)
vals = np.arange(n, dtype=np.int64).view(dtype)
tmp = df.copy()
tmp['dates'] = vals
ex_vals = to_datetime(vals.astype('O')).values
assert (tmp['dates'].values == ex_vals).all()
def test_shift(self):
# naive shift
shiftedFrame = self.tsframe.shift(5)
tm.assert_index_equal(shiftedFrame.index, self.tsframe.index)
shiftedSeries = self.tsframe['A'].shift(5)
assert_series_equal(shiftedFrame['A'], shiftedSeries)
shiftedFrame = self.tsframe.shift(-5)
tm.assert_index_equal(shiftedFrame.index, self.tsframe.index)
shiftedSeries = self.tsframe['A'].shift(-5)
assert_series_equal(shiftedFrame['A'], shiftedSeries)
# shift by 0
unshifted = self.tsframe.shift(0)
assert_frame_equal(unshifted, self.tsframe)
# shift by DateOffset
shiftedFrame = self.tsframe.shift(5, freq=offsets.BDay())
assert len(shiftedFrame) == len(self.tsframe)
shiftedFrame2 = self.tsframe.shift(5, freq='B')
assert_frame_equal(shiftedFrame, shiftedFrame2)
d = self.tsframe.index[0]
shifted_d = d + offsets.BDay(5)
assert_series_equal(self.tsframe.xs(d),
shiftedFrame.xs(shifted_d), check_names=False)
# shift int frame
int_shifted = self.intframe.shift(1) # noqa
# Shifting with PeriodIndex
ps = tm.makePeriodFrame()
shifted = ps.shift(1)
unshifted = shifted.shift(-1)
tm.assert_index_equal(shifted.index, ps.index)
tm.assert_index_equal(unshifted.index, ps.index)
tm.assert_numpy_array_equal(unshifted.iloc[:, 0].dropna().values,
ps.iloc[:-1, 0].values)
shifted2 = ps.shift(1, 'B')
shifted3 = ps.shift(1, offsets.BDay())
assert_frame_equal(shifted2, shifted3)
assert_frame_equal(ps, shifted2.shift(-1, 'B'))
msg = 'does not match PeriodIndex freq'
with pytest.raises(ValueError, match=msg):
ps.shift(freq='D')
# shift other axis
# GH 6371
df = DataFrame(np.random.rand(10, 5))
expected = pd.concat([DataFrame(np.nan, index=df.index,
columns=[0]),
df.iloc[:, 0:-1]],
ignore_index=True, axis=1)
result = df.shift(1, axis=1)
assert_frame_equal(result, expected)
# shift named axis
df = DataFrame(np.random.rand(10, 5))
expected = pd.concat([DataFrame(np.nan, index=df.index,
columns=[0]),
df.iloc[:, 0:-1]],
ignore_index=True, axis=1)
result = df.shift(1, axis='columns')
assert_frame_equal(result, expected)
def test_shift_bool(self):
df = DataFrame({'high': [True, False],
'low': [False, False]})
rs = df.shift(1)
xp = DataFrame(np.array([[np.nan, np.nan],
[True, False]], dtype=object),
columns=['high', 'low'])
assert_frame_equal(rs, xp)
def test_shift_categorical(self):
# GH 9416
s1 = pd.Series(['a', 'b', 'c'], dtype='category')
s2 = pd.Series(['A', 'B', 'C'], dtype='category')
df = DataFrame({'one': s1, 'two': s2})
rs = df.shift(1)
xp = DataFrame({'one': s1.shift(1), 'two': s2.shift(1)})
assert_frame_equal(rs, xp)
def test_shift_fill_value(self):
# GH #24128
df = DataFrame([1, 2, 3, 4, 5],
index=date_range('1/1/2000', periods=5, freq='H'))
exp = DataFrame([0, 1, 2, 3, 4],
index=date_range('1/1/2000', periods=5, freq='H'))
result = df.shift(1, fill_value=0)
assert_frame_equal(result, exp)
exp = DataFrame([0, 0, 1, 2, 3],
index=date_range('1/1/2000', periods=5, freq='H'))
result = df.shift(2, fill_value=0)
assert_frame_equal(result, exp)
def test_shift_empty(self):
# Regression test for #8019
df = DataFrame({'foo': []})
rs = df.shift(-1)
assert_frame_equal(df, rs)
def test_shift_duplicate_columns(self):
# GH 9092; verify that position-based shifting works
# in the presence of duplicate columns
column_lists = [list(range(5)), [1] * 5, [1, 1, 2, 2, 1]]
data = np.random.randn(20, 5)
Loading ...