Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / pandas   python

Repository URL to install this package:

Version: 0.24.2 

/ tests / test_multilevel.py

# -*- coding: utf-8 -*-
# pylint: disable-msg=W0612,E1101,W0141
import datetime
import itertools
from warnings import catch_warnings, simplefilter

import numpy as np
from numpy.random import randn
import pytest
import pytz

from pandas.compat import (
    StringIO, lrange, lzip, product as cart_product, range, u, zip)

from pandas.core.dtypes.common import is_float_dtype, is_integer_dtype

import pandas as pd
from pandas import DataFrame, Panel, Series, Timestamp, isna
from pandas.core.index import Index, MultiIndex
import pandas.util.testing as tm

AGG_FUNCTIONS = ['sum', 'prod', 'min', 'max', 'median', 'mean', 'skew', 'mad',
                 'std', 'var', 'sem']


class Base(object):

    def setup_method(self, method):

        index = MultiIndex(levels=[['foo', 'bar', 'baz', 'qux'], ['one', 'two',
                                                                  'three']],
                           codes=[[0, 0, 0, 1, 1, 2, 2, 3, 3, 3],
                                  [0, 1, 2, 0, 1, 1, 2, 0, 1, 2]],
                           names=['first', 'second'])
        self.frame = DataFrame(np.random.randn(10, 3), index=index,
                               columns=Index(['A', 'B', 'C'], name='exp'))

        self.single_level = MultiIndex(levels=[['foo', 'bar', 'baz', 'qux']],
                                       codes=[[0, 1, 2, 3]], names=['first'])

        # create test series object
        arrays = [['bar', 'bar', 'baz', 'baz', 'qux', 'qux', 'foo', 'foo'],
                  ['one', 'two', 'one', 'two', 'one', 'two', 'one', 'two']]
        tuples = lzip(*arrays)
        index = MultiIndex.from_tuples(tuples)
        s = Series(randn(8), index=index)
        s[3] = np.NaN
        self.series = s

        self.tdf = tm.makeTimeDataFrame(100)
        self.ymd = self.tdf.groupby([lambda x: x.year, lambda x: x.month,
                                     lambda x: x.day]).sum()

        # use Int64Index, to make sure things work
        self.ymd.index.set_levels([lev.astype('i8')
                                   for lev in self.ymd.index.levels],
                                  inplace=True)
        self.ymd.index.set_names(['year', 'month', 'day'], inplace=True)


class TestMultiLevel(Base):

    def test_append(self):
        a, b = self.frame[:5], self.frame[5:]

        result = a.append(b)
        tm.assert_frame_equal(result, self.frame)

        result = a['A'].append(b['A'])
        tm.assert_series_equal(result, self.frame['A'])

    def test_append_index(self):
        idx1 = Index([1.1, 1.2, 1.3])
        idx2 = pd.date_range('2011-01-01', freq='D', periods=3,
                             tz='Asia/Tokyo')
        idx3 = Index(['A', 'B', 'C'])

        midx_lv2 = MultiIndex.from_arrays([idx1, idx2])
        midx_lv3 = MultiIndex.from_arrays([idx1, idx2, idx3])

        result = idx1.append(midx_lv2)

        # see gh-7112
        tz = pytz.timezone('Asia/Tokyo')
        expected_tuples = [(1.1, tz.localize(datetime.datetime(2011, 1, 1))),
                           (1.2, tz.localize(datetime.datetime(2011, 1, 2))),
                           (1.3, tz.localize(datetime.datetime(2011, 1, 3)))]
        expected = Index([1.1, 1.2, 1.3] + expected_tuples)
        tm.assert_index_equal(result, expected)

        result = midx_lv2.append(idx1)
        expected = Index(expected_tuples + [1.1, 1.2, 1.3])
        tm.assert_index_equal(result, expected)

        result = midx_lv2.append(midx_lv2)
        expected = MultiIndex.from_arrays([idx1.append(idx1),
                                           idx2.append(idx2)])
        tm.assert_index_equal(result, expected)

        result = midx_lv2.append(midx_lv3)
        tm.assert_index_equal(result, expected)

        result = midx_lv3.append(midx_lv2)
        expected = Index._simple_new(
            np.array([(1.1, tz.localize(datetime.datetime(2011, 1, 1)), 'A'),
                      (1.2, tz.localize(datetime.datetime(2011, 1, 2)), 'B'),
                      (1.3, tz.localize(datetime.datetime(2011, 1, 3)), 'C')] +
                     expected_tuples), None)
        tm.assert_index_equal(result, expected)

    def test_dataframe_constructor(self):
        multi = DataFrame(np.random.randn(4, 4),
                          index=[np.array(['a', 'a', 'b', 'b']),
                                 np.array(['x', 'y', 'x', 'y'])])
        assert isinstance(multi.index, MultiIndex)
        assert not isinstance(multi.columns, MultiIndex)

        multi = DataFrame(np.random.randn(4, 4),
                          columns=[['a', 'a', 'b', 'b'],
                                   ['x', 'y', 'x', 'y']])
        assert isinstance(multi.columns, MultiIndex)

    def test_series_constructor(self):
        multi = Series(1., index=[np.array(['a', 'a', 'b', 'b']), np.array(
            ['x', 'y', 'x', 'y'])])
        assert isinstance(multi.index, MultiIndex)

        multi = Series(1., index=[['a', 'a', 'b', 'b'], ['x', 'y', 'x', 'y']])
        assert isinstance(multi.index, MultiIndex)

        multi = Series(lrange(4), index=[['a', 'a', 'b', 'b'],
                                         ['x', 'y', 'x', 'y']])
        assert isinstance(multi.index, MultiIndex)

    def test_reindex_level(self):
        # axis=0
        month_sums = self.ymd.sum(level='month')
        result = month_sums.reindex(self.ymd.index, level=1)
        expected = self.ymd.groupby(level='month').transform(np.sum)

        tm.assert_frame_equal(result, expected)

        # Series
        result = month_sums['A'].reindex(self.ymd.index, level=1)
        expected = self.ymd['A'].groupby(level='month').transform(np.sum)
        tm.assert_series_equal(result, expected, check_names=False)

        # axis=1
        month_sums = self.ymd.T.sum(axis=1, level='month')
        result = month_sums.reindex(columns=self.ymd.index, level=1)
        expected = self.ymd.groupby(level='month').transform(np.sum).T
        tm.assert_frame_equal(result, expected)

    def test_binops_level(self):
        def _check_op(opname):
            op = getattr(DataFrame, opname)
            month_sums = self.ymd.sum(level='month')
            result = op(self.ymd, month_sums, level='month')

            broadcasted = self.ymd.groupby(level='month').transform(np.sum)
            expected = op(self.ymd, broadcasted)
            tm.assert_frame_equal(result, expected)

            # Series
            op = getattr(Series, opname)
            result = op(self.ymd['A'], month_sums['A'], level='month')
            broadcasted = self.ymd['A'].groupby(level='month').transform(
                np.sum)
            expected = op(self.ymd['A'], broadcasted)
            expected.name = 'A'
            tm.assert_series_equal(result, expected)

        _check_op('sub')
        _check_op('add')
        _check_op('mul')
        _check_op('div')

    def test_pickle(self):
        def _test_roundtrip(frame):
            unpickled = tm.round_trip_pickle(frame)
            tm.assert_frame_equal(frame, unpickled)

        _test_roundtrip(self.frame)
        _test_roundtrip(self.frame.T)
        _test_roundtrip(self.ymd)
        _test_roundtrip(self.ymd.T)

    def test_reindex(self):
        expected = self.frame.iloc[[0, 3]]
        reindexed = self.frame.loc[[('foo', 'one'), ('bar', 'one')]]
        tm.assert_frame_equal(reindexed, expected)

        with catch_warnings(record=True):
            simplefilter("ignore", DeprecationWarning)
            reindexed = self.frame.ix[[('foo', 'one'), ('bar', 'one')]]
        tm.assert_frame_equal(reindexed, expected)

    def test_reindex_preserve_levels(self):
        new_index = self.ymd.index[::10]
        chunk = self.ymd.reindex(new_index)
        assert chunk.index is new_index

        chunk = self.ymd.loc[new_index]
        assert chunk.index is new_index

        with catch_warnings(record=True):
            simplefilter("ignore", DeprecationWarning)
            chunk = self.ymd.ix[new_index]
        assert chunk.index is new_index

        ymdT = self.ymd.T
        chunk = ymdT.reindex(columns=new_index)
        assert chunk.columns is new_index

        chunk = ymdT.loc[:, new_index]
        assert chunk.columns is new_index

    def test_repr_to_string(self):
        repr(self.frame)
        repr(self.ymd)
        repr(self.frame.T)
        repr(self.ymd.T)

        buf = StringIO()
        self.frame.to_string(buf=buf)
        self.ymd.to_string(buf=buf)
        self.frame.T.to_string(buf=buf)
        self.ymd.T.to_string(buf=buf)

    def test_repr_name_coincide(self):
        index = MultiIndex.from_tuples([('a', 0, 'foo'), ('b', 1, 'bar')],
                                       names=['a', 'b', 'c'])

        df = DataFrame({'value': [0, 1]}, index=index)

        lines = repr(df).split('\n')
        assert lines[2].startswith('a 0 foo')

    def test_delevel_infer_dtype(self):
        tuples = [tuple
                  for tuple in cart_product(
                      ['foo', 'bar'], [10, 20], [1.0, 1.1])]
        index = MultiIndex.from_tuples(tuples, names=['prm0', 'prm1', 'prm2'])
        df = DataFrame(np.random.randn(8, 3), columns=['A', 'B', 'C'],
                       index=index)
        deleveled = df.reset_index()
        assert is_integer_dtype(deleveled['prm1'])
        assert is_float_dtype(deleveled['prm2'])

    def test_reset_index_with_drop(self):
        deleveled = self.ymd.reset_index(drop=True)
        assert len(deleveled.columns) == len(self.ymd.columns)
        assert deleveled.index.name == self.ymd.index.name

        deleveled = self.series.reset_index()
        assert isinstance(deleveled, DataFrame)
        assert len(deleveled.columns) == len(self.series.index.levels) + 1
        assert deleveled.index.name == self.series.index.name

        deleveled = self.series.reset_index(drop=True)
        assert isinstance(deleveled, Series)
        assert deleveled.index.name == self.series.index.name

    def test_count_level(self):
        def _check_counts(frame, axis=0):
            index = frame._get_axis(axis)
            for i in range(index.nlevels):
                result = frame.count(axis=axis, level=i)
                expected = frame.groupby(axis=axis, level=i).count()
                expected = expected.reindex_like(result).astype('i8')
                tm.assert_frame_equal(result, expected)

        self.frame.iloc[1, [1, 2]] = np.nan
        self.frame.iloc[7, [0, 1]] = np.nan
        self.ymd.iloc[1, [1, 2]] = np.nan
        self.ymd.iloc[7, [0, 1]] = np.nan

        _check_counts(self.frame)
        _check_counts(self.ymd)
        _check_counts(self.frame.T, axis=1)
        _check_counts(self.ymd.T, axis=1)

        # can't call with level on regular DataFrame
        df = tm.makeTimeDataFrame()
        with pytest.raises(TypeError, match='hierarchical'):
            df.count(level=0)

        self.frame['D'] = 'foo'
        result = self.frame.count(level=0, numeric_only=True)
        tm.assert_index_equal(result.columns, Index(list('ABC'), name='exp'))

    def test_count_level_series(self):
        index = MultiIndex(levels=[['foo', 'bar', 'baz'], ['one', 'two',
                                                           'three', 'four']],
                           codes=[[0, 0, 0, 2, 2], [2, 0, 1, 1, 2]])

        s = Series(np.random.randn(len(index)), index=index)

        result = s.count(level=0)
        expected = s.groupby(level=0).count()
        tm.assert_series_equal(
            result.astype('f8'), expected.reindex(result.index).fillna(0))

        result = s.count(level=1)
        expected = s.groupby(level=1).count()
        tm.assert_series_equal(
            result.astype('f8'), expected.reindex(result.index).fillna(0))

    def test_count_level_corner(self):
        s = self.frame['A'][:0]
        result = s.count(level=0)
        expected = Series(0, index=s.index.levels[0], name='A')
        tm.assert_series_equal(result, expected)

        df = self.frame[:0]
        result = df.count(level=0)
        expected = DataFrame({}, index=s.index.levels[0],
                             columns=df.columns).fillna(0).astype(np.int64)
        tm.assert_frame_equal(result, expected)

    def test_get_level_number_out_of_bounds(self):
        with pytest.raises(IndexError, match="Too many levels"):
            self.frame.index._get_level_number(2)
        with pytest.raises(IndexError, match="not a valid level number"):
            self.frame.index._get_level_number(-3)

    def test_unstack(self):
        # just check that it works for now
        unstacked = self.ymd.unstack()
        unstacked.unstack()

        # test that ints work
        self.ymd.astype(int).unstack()

        # test that int32 work
        self.ymd.astype(np.int32).unstack()

    def test_unstack_multiple_no_empty_columns(self):
        index = MultiIndex.from_tuples([(0, 'foo', 0), (0, 'bar', 0), (
            1, 'baz', 1), (1, 'qux', 1)])

        s = Series(np.random.randn(4), index=index)

        unstacked = s.unstack([1, 2])
        expected = unstacked.dropna(axis=1, how='all')
Loading ...