# -*- coding: utf-8 -*-
from collections import OrderedDict
from datetime import date, datetime, timedelta
import numpy as np
import pytest
from pandas.compat import product, range
import pandas as pd
from pandas import (
Categorical, DataFrame, Grouper, Index, MultiIndex, Series, concat,
date_range)
from pandas.api.types import CategoricalDtype as CDT
from pandas.core.reshape.pivot import crosstab, pivot_table
import pandas.util.testing as tm
@pytest.fixture(params=[True, False])
def dropna(request):
return request.param
class TestPivotTable(object):
def setup_method(self, method):
self.data = DataFrame({'A': ['foo', 'foo', 'foo', 'foo',
'bar', 'bar', 'bar', 'bar',
'foo', 'foo', 'foo'],
'B': ['one', 'one', 'one', 'two',
'one', 'one', 'one', 'two',
'two', 'two', 'one'],
'C': ['dull', 'dull', 'shiny', 'dull',
'dull', 'shiny', 'shiny', 'dull',
'shiny', 'shiny', 'shiny'],
'D': np.random.randn(11),
'E': np.random.randn(11),
'F': np.random.randn(11)})
def test_pivot_table(self):
index = ['A', 'B']
columns = 'C'
table = pivot_table(self.data, values='D',
index=index, columns=columns)
table2 = self.data.pivot_table(
values='D', index=index, columns=columns)
tm.assert_frame_equal(table, table2)
# this works
pivot_table(self.data, values='D', index=index)
if len(index) > 1:
assert table.index.names == tuple(index)
else:
assert table.index.name == index[0]
if len(columns) > 1:
assert table.columns.names == columns
else:
assert table.columns.name == columns[0]
expected = self.data.groupby(
index + [columns])['D'].agg(np.mean).unstack()
tm.assert_frame_equal(table, expected)
def test_pivot_table_nocols(self):
df = DataFrame({'rows': ['a', 'b', 'c'],
'cols': ['x', 'y', 'z'],
'values': [1, 2, 3]})
rs = df.pivot_table(columns='cols', aggfunc=np.sum)
xp = df.pivot_table(index='cols', aggfunc=np.sum).T
tm.assert_frame_equal(rs, xp)
rs = df.pivot_table(columns='cols', aggfunc={'values': 'mean'})
xp = df.pivot_table(index='cols', aggfunc={'values': 'mean'}).T
tm.assert_frame_equal(rs, xp)
def test_pivot_table_dropna(self):
df = DataFrame({'amount': {0: 60000, 1: 100000, 2: 50000, 3: 30000},
'customer': {0: 'A', 1: 'A', 2: 'B', 3: 'C'},
'month': {0: 201307, 1: 201309, 2: 201308, 3: 201310},
'product': {0: 'a', 1: 'b', 2: 'c', 3: 'd'},
'quantity': {0: 2000000, 1: 500000,
2: 1000000, 3: 1000000}})
pv_col = df.pivot_table('quantity', 'month', [
'customer', 'product'], dropna=False)
pv_ind = df.pivot_table(
'quantity', ['customer', 'product'], 'month', dropna=False)
m = MultiIndex.from_tuples([('A', 'a'), ('A', 'b'), ('A', 'c'),
('A', 'd'), ('B', 'a'), ('B', 'b'),
('B', 'c'), ('B', 'd'), ('C', 'a'),
('C', 'b'), ('C', 'c'), ('C', 'd')],
names=['customer', 'product'])
tm.assert_index_equal(pv_col.columns, m)
tm.assert_index_equal(pv_ind.index, m)
def test_pivot_table_categorical(self):
cat1 = Categorical(["a", "a", "b", "b"],
categories=["a", "b", "z"], ordered=True)
cat2 = Categorical(["c", "d", "c", "d"],
categories=["c", "d", "y"], ordered=True)
df = DataFrame({"A": cat1, "B": cat2, "values": [1, 2, 3, 4]})
result = pd.pivot_table(df, values='values', index=['A', 'B'],
dropna=True)
exp_index = pd.MultiIndex.from_arrays(
[cat1, cat2],
names=['A', 'B'])
expected = DataFrame(
{'values': [1, 2, 3, 4]},
index=exp_index)
tm.assert_frame_equal(result, expected)
def test_pivot_table_dropna_categoricals(self, dropna):
# GH 15193
categories = ['a', 'b', 'c', 'd']
df = DataFrame({'A': ['a', 'a', 'a', 'b', 'b', 'b', 'c', 'c', 'c'],
'B': [1, 2, 3, 1, 2, 3, 1, 2, 3],
'C': range(0, 9)})
df['A'] = df['A'].astype(CDT(categories, ordered=False))
result = df.pivot_table(index='B', columns='A', values='C',
dropna=dropna)
expected_columns = Series(['a', 'b', 'c'], name='A')
expected_columns = expected_columns.astype(
CDT(categories, ordered=False))
expected_index = Series([1, 2, 3], name='B')
expected = DataFrame([[0, 3, 6],
[1, 4, 7],
[2, 5, 8]],
index=expected_index,
columns=expected_columns,)
if not dropna:
# add back the non observed to compare
expected = expected.reindex(
columns=Categorical(categories)).astype('float')
tm.assert_frame_equal(result, expected)
def test_pivot_with_non_observable_dropna(self, dropna):
# gh-21133
df = pd.DataFrame(
{'A': pd.Categorical([np.nan, 'low', 'high', 'low', 'high'],
categories=['low', 'high'],
ordered=True),
'B': range(5)})
result = df.pivot_table(index='A', values='B', dropna=dropna)
expected = pd.DataFrame(
{'B': [2, 3]},
index=pd.Index(
pd.Categorical.from_codes([0, 1],
categories=['low', 'high'],
ordered=True),
name='A'))
tm.assert_frame_equal(result, expected)
# gh-21378
df = pd.DataFrame(
{'A': pd.Categorical(['left', 'low', 'high', 'low', 'high'],
categories=['low', 'high', 'left'],
ordered=True),
'B': range(5)})
result = df.pivot_table(index='A', values='B', dropna=dropna)
expected = pd.DataFrame(
{'B': [2, 3, 0]},
index=pd.Index(
pd.Categorical.from_codes([0, 1, 2],
categories=['low', 'high', 'left'],
ordered=True),
name='A'))
tm.assert_frame_equal(result, expected)
def test_pass_array(self):
result = self.data.pivot_table(
'D', index=self.data.A, columns=self.data.C)
expected = self.data.pivot_table('D', index='A', columns='C')
tm.assert_frame_equal(result, expected)
def test_pass_function(self):
result = self.data.pivot_table('D', index=lambda x: x // 5,
columns=self.data.C)
expected = self.data.pivot_table('D', index=self.data.index // 5,
columns='C')
tm.assert_frame_equal(result, expected)
def test_pivot_table_multiple(self):
index = ['A', 'B']
columns = 'C'
table = pivot_table(self.data, index=index, columns=columns)
expected = self.data.groupby(index + [columns]).agg(np.mean).unstack()
tm.assert_frame_equal(table, expected)
def test_pivot_dtypes(self):
# can convert dtypes
f = DataFrame({'a': ['cat', 'bat', 'cat', 'bat'], 'v': [
1, 2, 3, 4], 'i': ['a', 'b', 'a', 'b']})
assert f.dtypes['v'] == 'int64'
z = pivot_table(f, values='v', index=['a'], columns=[
'i'], fill_value=0, aggfunc=np.sum)
result = z.get_dtype_counts()
expected = Series(dict(int64=2))
tm.assert_series_equal(result, expected)
# cannot convert dtypes
f = DataFrame({'a': ['cat', 'bat', 'cat', 'bat'], 'v': [
1.5, 2.5, 3.5, 4.5], 'i': ['a', 'b', 'a', 'b']})
assert f.dtypes['v'] == 'float64'
z = pivot_table(f, values='v', index=['a'], columns=[
'i'], fill_value=0, aggfunc=np.mean)
result = z.get_dtype_counts()
expected = Series(dict(float64=2))
tm.assert_series_equal(result, expected)
@pytest.mark.parametrize('columns,values',
[('bool1', ['float1', 'float2']),
('bool1', ['float1', 'float2', 'bool1']),
('bool2', ['float1', 'float2', 'bool1'])])
def test_pivot_preserve_dtypes(self, columns, values):
# GH 7142 regression test
v = np.arange(5, dtype=np.float64)
df = DataFrame({'float1': v, 'float2': v + 2.0,
'bool1': v <= 2, 'bool2': v <= 3})
df_res = df.reset_index().pivot_table(
index='index', columns=columns, values=values)
result = dict(df_res.dtypes)
expected = {col: np.dtype('O') if col[0].startswith('b')
else np.dtype('float64') for col in df_res}
assert result == expected
def test_pivot_no_values(self):
# GH 14380
idx = pd.DatetimeIndex(['2011-01-01', '2011-02-01', '2011-01-02',
'2011-01-01', '2011-01-02'])
df = pd.DataFrame({'A': [1, 2, 3, 4, 5]},
index=idx)
res = df.pivot_table(index=df.index.month, columns=df.index.day)
exp_columns = pd.MultiIndex.from_tuples([('A', 1), ('A', 2)])
exp = pd.DataFrame([[2.5, 4.0], [2.0, np.nan]],
index=[1, 2], columns=exp_columns)
tm.assert_frame_equal(res, exp)
df = pd.DataFrame({'A': [1, 2, 3, 4, 5],
'dt': pd.date_range('2011-01-01', freq='D',
periods=5)},
index=idx)
res = df.pivot_table(index=df.index.month,
columns=pd.Grouper(key='dt', freq='M'))
exp_columns = pd.MultiIndex.from_tuples([('A',
pd.Timestamp('2011-01-31'))])
exp_columns.names = [None, 'dt']
exp = pd.DataFrame([3.25, 2.0],
index=[1, 2], columns=exp_columns)
tm.assert_frame_equal(res, exp)
res = df.pivot_table(index=pd.Grouper(freq='A'),
columns=pd.Grouper(key='dt', freq='M'))
exp = pd.DataFrame([3],
index=pd.DatetimeIndex(['2011-12-31']),
columns=exp_columns)
tm.assert_frame_equal(res, exp)
def test_pivot_multi_values(self):
result = pivot_table(self.data, values=['D', 'E'],
index='A', columns=['B', 'C'], fill_value=0)
expected = pivot_table(self.data.drop(['F'], axis=1),
index='A', columns=['B', 'C'], fill_value=0)
tm.assert_frame_equal(result, expected)
def test_pivot_multi_functions(self):
f = lambda func: pivot_table(self.data, values=['D', 'E'],
index=['A', 'B'], columns='C',
aggfunc=func)
result = f([np.mean, np.std])
means = f(np.mean)
stds = f(np.std)
expected = concat([means, stds], keys=['mean', 'std'], axis=1)
tm.assert_frame_equal(result, expected)
# margins not supported??
f = lambda func: pivot_table(self.data, values=['D', 'E'],
index=['A', 'B'], columns='C',
aggfunc=func, margins=True)
result = f([np.mean, np.std])
means = f(np.mean)
stds = f(np.std)
expected = concat([means, stds], keys=['mean', 'std'], axis=1)
tm.assert_frame_equal(result, expected)
@pytest.mark.parametrize('method', [True, False])
def test_pivot_index_with_nan(self, method):
# GH 3588
nan = np.nan
df = DataFrame({'a': ['R1', 'R2', nan, 'R4'],
'b': ['C1', 'C2', 'C3', 'C4'],
'c': [10, 15, 17, 20]})
if method:
result = df.pivot('a', 'b', 'c')
else:
result = pd.pivot(df, 'a', 'b', 'c')
expected = DataFrame([[nan, nan, 17, nan], [10, nan, nan, nan],
[nan, 15, nan, nan], [nan, nan, nan, 20]],
index=Index([nan, 'R1', 'R2', 'R4'], name='a'),
columns=Index(['C1', 'C2', 'C3', 'C4'], name='b'))
tm.assert_frame_equal(result, expected)
tm.assert_frame_equal(df.pivot('b', 'a', 'c'), expected.T)
# GH9491
df = DataFrame({'a': pd.date_range('2014-02-01', periods=6, freq='D'),
'c': 100 + np.arange(6)})
df['b'] = df['a'] - pd.Timestamp('2014-02-02')
df.loc[1, 'a'] = df.loc[3, 'a'] = nan
df.loc[1, 'b'] = df.loc[4, 'b'] = nan
if method:
pv = df.pivot('a', 'b', 'c')
else:
pv = pd.pivot(df, 'a', 'b', 'c')
assert pv.notna().values.sum() == len(df)
for _, row in df.iterrows():
assert pv.loc[row['a'], row['b']] == row['c']
if method:
result = df.pivot('b', 'a', 'c')
else:
result = pd.pivot(df, 'b', 'a', 'c')
tm.assert_frame_equal(result, pv.T)
@pytest.mark.parametrize('method', [True, False])
def test_pivot_with_tz(self, method):
Loading ...