# -*- coding: utf-8 -*-
# pylint: disable-msg=W0612,E1101,W0141
import datetime
import itertools
from warnings import catch_warnings, simplefilter
import numpy as np
from numpy.random import randn
import pytest
import pytz
from pandas.compat import (
StringIO, lrange, lzip, product as cart_product, range, u, zip)
from pandas.core.dtypes.common import is_float_dtype, is_integer_dtype
import pandas as pd
from pandas import DataFrame, Panel, Series, Timestamp, isna
from pandas.core.index import Index, MultiIndex
import pandas.util.testing as tm
AGG_FUNCTIONS = ['sum', 'prod', 'min', 'max', 'median', 'mean', 'skew', 'mad',
'std', 'var', 'sem']
class Base(object):
def setup_method(self, method):
index = MultiIndex(levels=[['foo', 'bar', 'baz', 'qux'], ['one', 'two',
'three']],
codes=[[0, 0, 0, 1, 1, 2, 2, 3, 3, 3],
[0, 1, 2, 0, 1, 1, 2, 0, 1, 2]],
names=['first', 'second'])
self.frame = DataFrame(np.random.randn(10, 3), index=index,
columns=Index(['A', 'B', 'C'], name='exp'))
self.single_level = MultiIndex(levels=[['foo', 'bar', 'baz', 'qux']],
codes=[[0, 1, 2, 3]], names=['first'])
# create test series object
arrays = [['bar', 'bar', 'baz', 'baz', 'qux', 'qux', 'foo', 'foo'],
['one', 'two', 'one', 'two', 'one', 'two', 'one', 'two']]
tuples = lzip(*arrays)
index = MultiIndex.from_tuples(tuples)
s = Series(randn(8), index=index)
s[3] = np.NaN
self.series = s
self.tdf = tm.makeTimeDataFrame(100)
self.ymd = self.tdf.groupby([lambda x: x.year, lambda x: x.month,
lambda x: x.day]).sum()
# use Int64Index, to make sure things work
self.ymd.index.set_levels([lev.astype('i8')
for lev in self.ymd.index.levels],
inplace=True)
self.ymd.index.set_names(['year', 'month', 'day'], inplace=True)
class TestMultiLevel(Base):
def test_append(self):
a, b = self.frame[:5], self.frame[5:]
result = a.append(b)
tm.assert_frame_equal(result, self.frame)
result = a['A'].append(b['A'])
tm.assert_series_equal(result, self.frame['A'])
def test_append_index(self):
idx1 = Index([1.1, 1.2, 1.3])
idx2 = pd.date_range('2011-01-01', freq='D', periods=3,
tz='Asia/Tokyo')
idx3 = Index(['A', 'B', 'C'])
midx_lv2 = MultiIndex.from_arrays([idx1, idx2])
midx_lv3 = MultiIndex.from_arrays([idx1, idx2, idx3])
result = idx1.append(midx_lv2)
# see gh-7112
tz = pytz.timezone('Asia/Tokyo')
expected_tuples = [(1.1, tz.localize(datetime.datetime(2011, 1, 1))),
(1.2, tz.localize(datetime.datetime(2011, 1, 2))),
(1.3, tz.localize(datetime.datetime(2011, 1, 3)))]
expected = Index([1.1, 1.2, 1.3] + expected_tuples)
tm.assert_index_equal(result, expected)
result = midx_lv2.append(idx1)
expected = Index(expected_tuples + [1.1, 1.2, 1.3])
tm.assert_index_equal(result, expected)
result = midx_lv2.append(midx_lv2)
expected = MultiIndex.from_arrays([idx1.append(idx1),
idx2.append(idx2)])
tm.assert_index_equal(result, expected)
result = midx_lv2.append(midx_lv3)
tm.assert_index_equal(result, expected)
result = midx_lv3.append(midx_lv2)
expected = Index._simple_new(
np.array([(1.1, tz.localize(datetime.datetime(2011, 1, 1)), 'A'),
(1.2, tz.localize(datetime.datetime(2011, 1, 2)), 'B'),
(1.3, tz.localize(datetime.datetime(2011, 1, 3)), 'C')] +
expected_tuples), None)
tm.assert_index_equal(result, expected)
def test_dataframe_constructor(self):
multi = DataFrame(np.random.randn(4, 4),
index=[np.array(['a', 'a', 'b', 'b']),
np.array(['x', 'y', 'x', 'y'])])
assert isinstance(multi.index, MultiIndex)
assert not isinstance(multi.columns, MultiIndex)
multi = DataFrame(np.random.randn(4, 4),
columns=[['a', 'a', 'b', 'b'],
['x', 'y', 'x', 'y']])
assert isinstance(multi.columns, MultiIndex)
def test_series_constructor(self):
multi = Series(1., index=[np.array(['a', 'a', 'b', 'b']), np.array(
['x', 'y', 'x', 'y'])])
assert isinstance(multi.index, MultiIndex)
multi = Series(1., index=[['a', 'a', 'b', 'b'], ['x', 'y', 'x', 'y']])
assert isinstance(multi.index, MultiIndex)
multi = Series(lrange(4), index=[['a', 'a', 'b', 'b'],
['x', 'y', 'x', 'y']])
assert isinstance(multi.index, MultiIndex)
def test_reindex_level(self):
# axis=0
month_sums = self.ymd.sum(level='month')
result = month_sums.reindex(self.ymd.index, level=1)
expected = self.ymd.groupby(level='month').transform(np.sum)
tm.assert_frame_equal(result, expected)
# Series
result = month_sums['A'].reindex(self.ymd.index, level=1)
expected = self.ymd['A'].groupby(level='month').transform(np.sum)
tm.assert_series_equal(result, expected, check_names=False)
# axis=1
month_sums = self.ymd.T.sum(axis=1, level='month')
result = month_sums.reindex(columns=self.ymd.index, level=1)
expected = self.ymd.groupby(level='month').transform(np.sum).T
tm.assert_frame_equal(result, expected)
def test_binops_level(self):
def _check_op(opname):
op = getattr(DataFrame, opname)
month_sums = self.ymd.sum(level='month')
result = op(self.ymd, month_sums, level='month')
broadcasted = self.ymd.groupby(level='month').transform(np.sum)
expected = op(self.ymd, broadcasted)
tm.assert_frame_equal(result, expected)
# Series
op = getattr(Series, opname)
result = op(self.ymd['A'], month_sums['A'], level='month')
broadcasted = self.ymd['A'].groupby(level='month').transform(
np.sum)
expected = op(self.ymd['A'], broadcasted)
expected.name = 'A'
tm.assert_series_equal(result, expected)
_check_op('sub')
_check_op('add')
_check_op('mul')
_check_op('div')
def test_pickle(self):
def _test_roundtrip(frame):
unpickled = tm.round_trip_pickle(frame)
tm.assert_frame_equal(frame, unpickled)
_test_roundtrip(self.frame)
_test_roundtrip(self.frame.T)
_test_roundtrip(self.ymd)
_test_roundtrip(self.ymd.T)
def test_reindex(self):
expected = self.frame.iloc[[0, 3]]
reindexed = self.frame.loc[[('foo', 'one'), ('bar', 'one')]]
tm.assert_frame_equal(reindexed, expected)
with catch_warnings(record=True):
simplefilter("ignore", DeprecationWarning)
reindexed = self.frame.ix[[('foo', 'one'), ('bar', 'one')]]
tm.assert_frame_equal(reindexed, expected)
def test_reindex_preserve_levels(self):
new_index = self.ymd.index[::10]
chunk = self.ymd.reindex(new_index)
assert chunk.index is new_index
chunk = self.ymd.loc[new_index]
assert chunk.index is new_index
with catch_warnings(record=True):
simplefilter("ignore", DeprecationWarning)
chunk = self.ymd.ix[new_index]
assert chunk.index is new_index
ymdT = self.ymd.T
chunk = ymdT.reindex(columns=new_index)
assert chunk.columns is new_index
chunk = ymdT.loc[:, new_index]
assert chunk.columns is new_index
def test_repr_to_string(self):
repr(self.frame)
repr(self.ymd)
repr(self.frame.T)
repr(self.ymd.T)
buf = StringIO()
self.frame.to_string(buf=buf)
self.ymd.to_string(buf=buf)
self.frame.T.to_string(buf=buf)
self.ymd.T.to_string(buf=buf)
def test_repr_name_coincide(self):
index = MultiIndex.from_tuples([('a', 0, 'foo'), ('b', 1, 'bar')],
names=['a', 'b', 'c'])
df = DataFrame({'value': [0, 1]}, index=index)
lines = repr(df).split('\n')
assert lines[2].startswith('a 0 foo')
def test_delevel_infer_dtype(self):
tuples = [tuple
for tuple in cart_product(
['foo', 'bar'], [10, 20], [1.0, 1.1])]
index = MultiIndex.from_tuples(tuples, names=['prm0', 'prm1', 'prm2'])
df = DataFrame(np.random.randn(8, 3), columns=['A', 'B', 'C'],
index=index)
deleveled = df.reset_index()
assert is_integer_dtype(deleveled['prm1'])
assert is_float_dtype(deleveled['prm2'])
def test_reset_index_with_drop(self):
deleveled = self.ymd.reset_index(drop=True)
assert len(deleveled.columns) == len(self.ymd.columns)
assert deleveled.index.name == self.ymd.index.name
deleveled = self.series.reset_index()
assert isinstance(deleveled, DataFrame)
assert len(deleveled.columns) == len(self.series.index.levels) + 1
assert deleveled.index.name == self.series.index.name
deleveled = self.series.reset_index(drop=True)
assert isinstance(deleveled, Series)
assert deleveled.index.name == self.series.index.name
def test_count_level(self):
def _check_counts(frame, axis=0):
index = frame._get_axis(axis)
for i in range(index.nlevels):
result = frame.count(axis=axis, level=i)
expected = frame.groupby(axis=axis, level=i).count()
expected = expected.reindex_like(result).astype('i8')
tm.assert_frame_equal(result, expected)
self.frame.iloc[1, [1, 2]] = np.nan
self.frame.iloc[7, [0, 1]] = np.nan
self.ymd.iloc[1, [1, 2]] = np.nan
self.ymd.iloc[7, [0, 1]] = np.nan
_check_counts(self.frame)
_check_counts(self.ymd)
_check_counts(self.frame.T, axis=1)
_check_counts(self.ymd.T, axis=1)
# can't call with level on regular DataFrame
df = tm.makeTimeDataFrame()
with pytest.raises(TypeError, match='hierarchical'):
df.count(level=0)
self.frame['D'] = 'foo'
result = self.frame.count(level=0, numeric_only=True)
tm.assert_index_equal(result.columns, Index(list('ABC'), name='exp'))
def test_count_level_series(self):
index = MultiIndex(levels=[['foo', 'bar', 'baz'], ['one', 'two',
'three', 'four']],
codes=[[0, 0, 0, 2, 2], [2, 0, 1, 1, 2]])
s = Series(np.random.randn(len(index)), index=index)
result = s.count(level=0)
expected = s.groupby(level=0).count()
tm.assert_series_equal(
result.astype('f8'), expected.reindex(result.index).fillna(0))
result = s.count(level=1)
expected = s.groupby(level=1).count()
tm.assert_series_equal(
result.astype('f8'), expected.reindex(result.index).fillna(0))
def test_count_level_corner(self):
s = self.frame['A'][:0]
result = s.count(level=0)
expected = Series(0, index=s.index.levels[0], name='A')
tm.assert_series_equal(result, expected)
df = self.frame[:0]
result = df.count(level=0)
expected = DataFrame({}, index=s.index.levels[0],
columns=df.columns).fillna(0).astype(np.int64)
tm.assert_frame_equal(result, expected)
def test_get_level_number_out_of_bounds(self):
with pytest.raises(IndexError, match="Too many levels"):
self.frame.index._get_level_number(2)
with pytest.raises(IndexError, match="not a valid level number"):
self.frame.index._get_level_number(-3)
def test_unstack(self):
# just check that it works for now
unstacked = self.ymd.unstack()
unstacked.unstack()
# test that ints work
self.ymd.astype(int).unstack()
# test that int32 work
self.ymd.astype(np.int32).unstack()
def test_unstack_multiple_no_empty_columns(self):
index = MultiIndex.from_tuples([(0, 'foo', 0), (0, 'bar', 0), (
1, 'baz', 1), (1, 'qux', 1)])
s = Series(np.random.randn(4), index=index)
unstacked = s.unstack([1, 2])
expected = unstacked.dropna(axis=1, how='all')
Loading ...