Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / pandas   python

Repository URL to install this package:

Version: 0.24.2 

/ tests / reshape / test_pivot.py

# -*- coding: utf-8 -*-

from collections import OrderedDict
from datetime import date, datetime, timedelta

import numpy as np
import pytest

from pandas.compat import product, range

import pandas as pd
from pandas import (
    Categorical, DataFrame, Grouper, Index, MultiIndex, Series, concat,
    date_range)
from pandas.api.types import CategoricalDtype as CDT
from pandas.core.reshape.pivot import crosstab, pivot_table
import pandas.util.testing as tm


@pytest.fixture(params=[True, False])
def dropna(request):
    return request.param


class TestPivotTable(object):

    def setup_method(self, method):
        self.data = DataFrame({'A': ['foo', 'foo', 'foo', 'foo',
                                     'bar', 'bar', 'bar', 'bar',
                                     'foo', 'foo', 'foo'],
                               'B': ['one', 'one', 'one', 'two',
                                     'one', 'one', 'one', 'two',
                                     'two', 'two', 'one'],
                               'C': ['dull', 'dull', 'shiny', 'dull',
                                     'dull', 'shiny', 'shiny', 'dull',
                                     'shiny', 'shiny', 'shiny'],
                               'D': np.random.randn(11),
                               'E': np.random.randn(11),
                               'F': np.random.randn(11)})

    def test_pivot_table(self):
        index = ['A', 'B']
        columns = 'C'
        table = pivot_table(self.data, values='D',
                            index=index, columns=columns)

        table2 = self.data.pivot_table(
            values='D', index=index, columns=columns)
        tm.assert_frame_equal(table, table2)

        # this works
        pivot_table(self.data, values='D', index=index)

        if len(index) > 1:
            assert table.index.names == tuple(index)
        else:
            assert table.index.name == index[0]

        if len(columns) > 1:
            assert table.columns.names == columns
        else:
            assert table.columns.name == columns[0]

        expected = self.data.groupby(
            index + [columns])['D'].agg(np.mean).unstack()
        tm.assert_frame_equal(table, expected)

    def test_pivot_table_nocols(self):
        df = DataFrame({'rows': ['a', 'b', 'c'],
                        'cols': ['x', 'y', 'z'],
                        'values': [1, 2, 3]})
        rs = df.pivot_table(columns='cols', aggfunc=np.sum)
        xp = df.pivot_table(index='cols', aggfunc=np.sum).T
        tm.assert_frame_equal(rs, xp)

        rs = df.pivot_table(columns='cols', aggfunc={'values': 'mean'})
        xp = df.pivot_table(index='cols', aggfunc={'values': 'mean'}).T
        tm.assert_frame_equal(rs, xp)

    def test_pivot_table_dropna(self):
        df = DataFrame({'amount': {0: 60000, 1: 100000, 2: 50000, 3: 30000},
                        'customer': {0: 'A', 1: 'A', 2: 'B', 3: 'C'},
                        'month': {0: 201307, 1: 201309, 2: 201308, 3: 201310},
                        'product': {0: 'a', 1: 'b', 2: 'c', 3: 'd'},
                        'quantity': {0: 2000000, 1: 500000,
                                     2: 1000000, 3: 1000000}})
        pv_col = df.pivot_table('quantity', 'month', [
                                'customer', 'product'], dropna=False)
        pv_ind = df.pivot_table(
            'quantity', ['customer', 'product'], 'month', dropna=False)

        m = MultiIndex.from_tuples([('A', 'a'), ('A', 'b'), ('A', 'c'),
                                    ('A', 'd'), ('B', 'a'), ('B', 'b'),
                                    ('B', 'c'), ('B', 'd'), ('C', 'a'),
                                    ('C', 'b'), ('C', 'c'), ('C', 'd')],
                                   names=['customer', 'product'])
        tm.assert_index_equal(pv_col.columns, m)
        tm.assert_index_equal(pv_ind.index, m)

    def test_pivot_table_categorical(self):

        cat1 = Categorical(["a", "a", "b", "b"],
                           categories=["a", "b", "z"], ordered=True)
        cat2 = Categorical(["c", "d", "c", "d"],
                           categories=["c", "d", "y"], ordered=True)
        df = DataFrame({"A": cat1, "B": cat2, "values": [1, 2, 3, 4]})
        result = pd.pivot_table(df, values='values', index=['A', 'B'],
                                dropna=True)

        exp_index = pd.MultiIndex.from_arrays(
            [cat1, cat2],
            names=['A', 'B'])
        expected = DataFrame(
            {'values': [1, 2, 3, 4]},
            index=exp_index)
        tm.assert_frame_equal(result, expected)

    def test_pivot_table_dropna_categoricals(self, dropna):
        # GH 15193
        categories = ['a', 'b', 'c', 'd']

        df = DataFrame({'A': ['a', 'a', 'a', 'b', 'b', 'b', 'c', 'c', 'c'],
                        'B': [1, 2, 3, 1, 2, 3, 1, 2, 3],
                        'C': range(0, 9)})

        df['A'] = df['A'].astype(CDT(categories, ordered=False))
        result = df.pivot_table(index='B', columns='A', values='C',
                                dropna=dropna)
        expected_columns = Series(['a', 'b', 'c'], name='A')
        expected_columns = expected_columns.astype(
            CDT(categories, ordered=False))
        expected_index = Series([1, 2, 3], name='B')
        expected = DataFrame([[0, 3, 6],
                              [1, 4, 7],
                              [2, 5, 8]],
                             index=expected_index,
                             columns=expected_columns,)
        if not dropna:
            # add back the non observed to compare
            expected = expected.reindex(
                columns=Categorical(categories)).astype('float')

        tm.assert_frame_equal(result, expected)

    def test_pivot_with_non_observable_dropna(self, dropna):
        # gh-21133
        df = pd.DataFrame(
            {'A': pd.Categorical([np.nan, 'low', 'high', 'low', 'high'],
                                 categories=['low', 'high'],
                                 ordered=True),
             'B': range(5)})

        result = df.pivot_table(index='A', values='B', dropna=dropna)
        expected = pd.DataFrame(
            {'B': [2, 3]},
            index=pd.Index(
                pd.Categorical.from_codes([0, 1],
                                          categories=['low', 'high'],
                                          ordered=True),
                name='A'))

        tm.assert_frame_equal(result, expected)

        # gh-21378
        df = pd.DataFrame(
            {'A': pd.Categorical(['left', 'low', 'high', 'low', 'high'],
                                 categories=['low', 'high', 'left'],
                                 ordered=True),
             'B': range(5)})

        result = df.pivot_table(index='A', values='B', dropna=dropna)
        expected = pd.DataFrame(
            {'B': [2, 3, 0]},
            index=pd.Index(
                pd.Categorical.from_codes([0, 1, 2],
                                          categories=['low', 'high', 'left'],
                                          ordered=True),
                name='A'))

        tm.assert_frame_equal(result, expected)

    def test_pass_array(self):
        result = self.data.pivot_table(
            'D', index=self.data.A, columns=self.data.C)
        expected = self.data.pivot_table('D', index='A', columns='C')
        tm.assert_frame_equal(result, expected)

    def test_pass_function(self):
        result = self.data.pivot_table('D', index=lambda x: x // 5,
                                       columns=self.data.C)
        expected = self.data.pivot_table('D', index=self.data.index // 5,
                                         columns='C')
        tm.assert_frame_equal(result, expected)

    def test_pivot_table_multiple(self):
        index = ['A', 'B']
        columns = 'C'
        table = pivot_table(self.data, index=index, columns=columns)
        expected = self.data.groupby(index + [columns]).agg(np.mean).unstack()
        tm.assert_frame_equal(table, expected)

    def test_pivot_dtypes(self):

        # can convert dtypes
        f = DataFrame({'a': ['cat', 'bat', 'cat', 'bat'], 'v': [
                      1, 2, 3, 4], 'i': ['a', 'b', 'a', 'b']})
        assert f.dtypes['v'] == 'int64'

        z = pivot_table(f, values='v', index=['a'], columns=[
                        'i'], fill_value=0, aggfunc=np.sum)
        result = z.get_dtype_counts()
        expected = Series(dict(int64=2))
        tm.assert_series_equal(result, expected)

        # cannot convert dtypes
        f = DataFrame({'a': ['cat', 'bat', 'cat', 'bat'], 'v': [
                      1.5, 2.5, 3.5, 4.5], 'i': ['a', 'b', 'a', 'b']})
        assert f.dtypes['v'] == 'float64'

        z = pivot_table(f, values='v', index=['a'], columns=[
                        'i'], fill_value=0, aggfunc=np.mean)
        result = z.get_dtype_counts()
        expected = Series(dict(float64=2))
        tm.assert_series_equal(result, expected)

    @pytest.mark.parametrize('columns,values',
                             [('bool1', ['float1', 'float2']),
                              ('bool1', ['float1', 'float2', 'bool1']),
                              ('bool2', ['float1', 'float2', 'bool1'])])
    def test_pivot_preserve_dtypes(self, columns, values):
        # GH 7142 regression test
        v = np.arange(5, dtype=np.float64)
        df = DataFrame({'float1': v, 'float2': v + 2.0,
                        'bool1': v <= 2, 'bool2': v <= 3})

        df_res = df.reset_index().pivot_table(
            index='index', columns=columns, values=values)

        result = dict(df_res.dtypes)
        expected = {col: np.dtype('O') if col[0].startswith('b')
                    else np.dtype('float64') for col in df_res}
        assert result == expected

    def test_pivot_no_values(self):
        # GH 14380
        idx = pd.DatetimeIndex(['2011-01-01', '2011-02-01', '2011-01-02',
                                '2011-01-01', '2011-01-02'])
        df = pd.DataFrame({'A': [1, 2, 3, 4, 5]},
                          index=idx)
        res = df.pivot_table(index=df.index.month, columns=df.index.day)

        exp_columns = pd.MultiIndex.from_tuples([('A', 1), ('A', 2)])
        exp = pd.DataFrame([[2.5, 4.0], [2.0, np.nan]],
                           index=[1, 2], columns=exp_columns)
        tm.assert_frame_equal(res, exp)

        df = pd.DataFrame({'A': [1, 2, 3, 4, 5],
                           'dt': pd.date_range('2011-01-01', freq='D',
                                               periods=5)},
                          index=idx)
        res = df.pivot_table(index=df.index.month,
                             columns=pd.Grouper(key='dt', freq='M'))
        exp_columns = pd.MultiIndex.from_tuples([('A',
                                                  pd.Timestamp('2011-01-31'))])
        exp_columns.names = [None, 'dt']
        exp = pd.DataFrame([3.25, 2.0],
                           index=[1, 2], columns=exp_columns)
        tm.assert_frame_equal(res, exp)

        res = df.pivot_table(index=pd.Grouper(freq='A'),
                             columns=pd.Grouper(key='dt', freq='M'))
        exp = pd.DataFrame([3],
                           index=pd.DatetimeIndex(['2011-12-31']),
                           columns=exp_columns)
        tm.assert_frame_equal(res, exp)

    def test_pivot_multi_values(self):
        result = pivot_table(self.data, values=['D', 'E'],
                             index='A', columns=['B', 'C'], fill_value=0)
        expected = pivot_table(self.data.drop(['F'], axis=1),
                               index='A', columns=['B', 'C'], fill_value=0)
        tm.assert_frame_equal(result, expected)

    def test_pivot_multi_functions(self):
        f = lambda func: pivot_table(self.data, values=['D', 'E'],
                                     index=['A', 'B'], columns='C',
                                     aggfunc=func)
        result = f([np.mean, np.std])
        means = f(np.mean)
        stds = f(np.std)
        expected = concat([means, stds], keys=['mean', 'std'], axis=1)
        tm.assert_frame_equal(result, expected)

        # margins not supported??
        f = lambda func: pivot_table(self.data, values=['D', 'E'],
                                     index=['A', 'B'], columns='C',
                                     aggfunc=func, margins=True)
        result = f([np.mean, np.std])
        means = f(np.mean)
        stds = f(np.std)
        expected = concat([means, stds], keys=['mean', 'std'], axis=1)
        tm.assert_frame_equal(result, expected)

    @pytest.mark.parametrize('method', [True, False])
    def test_pivot_index_with_nan(self, method):
        # GH 3588
        nan = np.nan
        df = DataFrame({'a': ['R1', 'R2', nan, 'R4'],
                        'b': ['C1', 'C2', 'C3', 'C4'],
                        'c': [10, 15, 17, 20]})
        if method:
            result = df.pivot('a', 'b', 'c')
        else:
            result = pd.pivot(df, 'a', 'b', 'c')
        expected = DataFrame([[nan, nan, 17, nan], [10, nan, nan, nan],
                              [nan, 15, nan, nan], [nan, nan, nan, 20]],
                             index=Index([nan, 'R1', 'R2', 'R4'], name='a'),
                             columns=Index(['C1', 'C2', 'C3', 'C4'], name='b'))
        tm.assert_frame_equal(result, expected)
        tm.assert_frame_equal(df.pivot('b', 'a', 'c'), expected.T)

        # GH9491
        df = DataFrame({'a': pd.date_range('2014-02-01', periods=6, freq='D'),
                        'c': 100 + np.arange(6)})
        df['b'] = df['a'] - pd.Timestamp('2014-02-02')
        df.loc[1, 'a'] = df.loc[3, 'a'] = nan
        df.loc[1, 'b'] = df.loc[4, 'b'] = nan

        if method:
            pv = df.pivot('a', 'b', 'c')
        else:
            pv = pd.pivot(df, 'a', 'b', 'c')
        assert pv.notna().values.sum() == len(df)

        for _, row in df.iterrows():
            assert pv.loc[row['a'], row['b']] == row['c']

        if method:
            result = df.pivot('b', 'a', 'c')
        else:
            result = pd.pivot(df, 'b', 'a', 'c')
        tm.assert_frame_equal(result, pv.T)

    @pytest.mark.parametrize('method', [True, False])
    def test_pivot_with_tz(self, method):
Loading ...