Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

agriconnect / pandas   python

Repository URL to install this package:

Version: 0.24.2 

/ tests / indexing / common.py

""" common utilities """

import itertools
from warnings import catch_warnings, filterwarnings

import numpy as np
import pytest

from pandas.compat import lrange

from pandas.core.dtypes.common import is_scalar

from pandas import (
    DataFrame, Float64Index, MultiIndex, Panel, Series, UInt64Index,
    date_range)
from pandas.util import testing as tm

from pandas.io.formats.printing import pprint_thing

_verbose = False


def _mklbl(prefix, n):
    return ["%s%s" % (prefix, i) for i in range(n)]


def _axify(obj, key, axis):
    # create a tuple accessor
    axes = [slice(None)] * obj.ndim
    axes[axis] = key
    return tuple(axes)


@pytest.mark.filterwarnings("ignore:\\nPanel:FutureWarning")
class Base(object):
    """ indexing comprehensive base class """

    _objs = {'series', 'frame', 'panel'}
    _typs = {'ints', 'uints', 'labels', 'mixed', 'ts', 'floats', 'empty',
             'ts_rev', 'multi'}

    def setup_method(self, method):

        self.series_ints = Series(np.random.rand(4), index=lrange(0, 8, 2))
        self.frame_ints = DataFrame(np.random.randn(4, 4),
                                    index=lrange(0, 8, 2),
                                    columns=lrange(0, 12, 3))
        with catch_warnings(record=True):
            self.panel_ints = Panel(np.random.rand(4, 4, 4),
                                    items=lrange(0, 8, 2),
                                    major_axis=lrange(0, 12, 3),
                                    minor_axis=lrange(0, 16, 4))

        self.series_uints = Series(np.random.rand(4),
                                   index=UInt64Index(lrange(0, 8, 2)))
        self.frame_uints = DataFrame(np.random.randn(4, 4),
                                     index=UInt64Index(lrange(0, 8, 2)),
                                     columns=UInt64Index(lrange(0, 12, 3)))
        self.panel_uints = Panel(np.random.rand(4, 4, 4),
                                 items=UInt64Index(lrange(0, 8, 2)),
                                 major_axis=UInt64Index(lrange(0, 12, 3)),
                                 minor_axis=UInt64Index(lrange(0, 16, 4)))

        self.series_floats = Series(np.random.rand(4),
                                    index=Float64Index(range(0, 8, 2)))
        self.frame_floats = DataFrame(np.random.randn(4, 4),
                                      index=Float64Index(range(0, 8, 2)),
                                      columns=Float64Index(range(0, 12, 3)))
        self.panel_floats = Panel(np.random.rand(4, 4, 4),
                                  items=Float64Index(range(0, 8, 2)),
                                  major_axis=Float64Index(range(0, 12, 3)),
                                  minor_axis=Float64Index(range(0, 16, 4)))

        m_idces = [MultiIndex.from_product([[1, 2], [3, 4]]),
                   MultiIndex.from_product([[5, 6], [7, 8]]),
                   MultiIndex.from_product([[9, 10], [11, 12]])]

        self.series_multi = Series(np.random.rand(4),
                                   index=m_idces[0])
        self.frame_multi = DataFrame(np.random.randn(4, 4),
                                     index=m_idces[0],
                                     columns=m_idces[1])
        self.panel_multi = Panel(np.random.rand(4, 4, 4),
                                 items=m_idces[0],
                                 major_axis=m_idces[1],
                                 minor_axis=m_idces[2])

        self.series_labels = Series(np.random.randn(4), index=list('abcd'))
        self.frame_labels = DataFrame(np.random.randn(4, 4),
                                      index=list('abcd'), columns=list('ABCD'))
        self.panel_labels = Panel(np.random.randn(4, 4, 4),
                                  items=list('abcd'),
                                  major_axis=list('ABCD'),
                                  minor_axis=list('ZYXW'))

        self.series_mixed = Series(np.random.randn(4), index=[2, 4, 'null', 8])
        self.frame_mixed = DataFrame(np.random.randn(4, 4),
                                     index=[2, 4, 'null', 8])
        self.panel_mixed = Panel(np.random.randn(4, 4, 4),
                                 items=[2, 4, 'null', 8])

        self.series_ts = Series(np.random.randn(4),
                                index=date_range('20130101', periods=4))
        self.frame_ts = DataFrame(np.random.randn(4, 4),
                                  index=date_range('20130101', periods=4))
        self.panel_ts = Panel(np.random.randn(4, 4, 4),
                              items=date_range('20130101', periods=4))

        dates_rev = (date_range('20130101', periods=4)
                     .sort_values(ascending=False))
        self.series_ts_rev = Series(np.random.randn(4),
                                    index=dates_rev)
        self.frame_ts_rev = DataFrame(np.random.randn(4, 4),
                                      index=dates_rev)
        self.panel_ts_rev = Panel(np.random.randn(4, 4, 4),
                                  items=dates_rev)

        self.frame_empty = DataFrame({})
        self.series_empty = Series({})
        self.panel_empty = Panel({})

        # form agglomerates
        for o in self._objs:

            d = dict()
            for t in self._typs:
                d[t] = getattr(self, '%s_%s' % (o, t), None)

            setattr(self, o, d)

    def generate_indices(self, f, values=False):
        """ generate the indices
        if values is True , use the axis values
        is False, use the range
        """

        axes = f.axes
        if values:
            axes = [lrange(len(a)) for a in axes]

        return itertools.product(*axes)

    def get_result(self, obj, method, key, axis):
        """ return the result for this obj with this key and this axis """

        if isinstance(key, dict):
            key = key[axis]

        # use an artificial conversion to map the key as integers to the labels
        # so ix can work for comparisons
        if method == 'indexer':
            method = 'ix'
            key = obj._get_axis(axis)[key]

        # in case we actually want 0 index slicing
        with catch_warnings(record=True):
            try:
                xp = getattr(obj, method).__getitem__(_axify(obj, key, axis))
            except AttributeError:
                xp = getattr(obj, method).__getitem__(key)

        return xp

    def get_value(self, f, i, values=False):
        """ return the value for the location i """

        # check against values
        if values:
            return f.values[i]

        # this is equiv of f[col][row].....
        # v = f
        # for a in reversed(i):
        #    v = v.__getitem__(a)
        # return v
        with catch_warnings(record=True):
            filterwarnings("ignore", "\\n.ix", DeprecationWarning)
            return f.ix[i]

    def check_values(self, f, func, values=False):

        if f is None:
            return
        axes = f.axes
        indicies = itertools.product(*axes)

        for i in indicies:
            result = getattr(f, func)[i]

            # check against values
            if values:
                expected = f.values[i]
            else:
                expected = f
                for a in reversed(i):
                    expected = expected.__getitem__(a)

            tm.assert_almost_equal(result, expected)

    def check_result(self, name, method1, key1, method2, key2, typs=None,
                     objs=None, axes=None, fails=None):
        def _eq(t, o, a, obj, k1, k2):
            """ compare equal for these 2 keys """

            if a is not None and a > obj.ndim - 1:
                return

            def _print(result, error=None):
                if error is not None:
                    error = str(error)
                v = ("%-16.16s [%-16.16s]: [typ->%-8.8s,obj->%-8.8s,"
                     "key1->(%-4.4s),key2->(%-4.4s),axis->%s] %s" %
                     (name, result, t, o, method1, method2, a, error or ''))
                if _verbose:
                    pprint_thing(v)

            try:
                rs = getattr(obj, method1).__getitem__(_axify(obj, k1, a))

                try:
                    xp = self.get_result(obj, method2, k2, a)
                except Exception:
                    result = 'no comp'
                    _print(result)
                    return

                detail = None

                try:
                    if is_scalar(rs) and is_scalar(xp):
                        assert rs == xp
                    elif xp.ndim == 1:
                        tm.assert_series_equal(rs, xp)
                    elif xp.ndim == 2:
                        tm.assert_frame_equal(rs, xp)
                    elif xp.ndim == 3:
                        tm.assert_panel_equal(rs, xp)
                    result = 'ok'
                except AssertionError as e:
                    detail = str(e)
                    result = 'fail'

                # reverse the checks
                if fails is True:
                    if result == 'fail':
                        result = 'ok (fail)'

                _print(result)
                if not result.startswith('ok'):
                    raise AssertionError(detail)

            except AssertionError:
                raise
            except Exception as detail:

                # if we are in fails, the ok, otherwise raise it
                if fails is not None:
                    if isinstance(detail, fails):
                        result = 'ok (%s)' % type(detail).__name__
                        _print(result)
                        return

                result = type(detail).__name__
                raise AssertionError(_print(result, error=detail))

        if typs is None:
            typs = self._typs

        if objs is None:
            objs = self._objs

        if axes is not None:
            if not isinstance(axes, (tuple, list)):
                axes = [axes]
            else:
                axes = list(axes)
        else:
            axes = [0, 1, 2]

        # check
        for o in objs:
            if o not in self._objs:
                continue

            d = getattr(self, o)
            for a in axes:
                for t in typs:
                    if t not in self._typs:
                        continue

                    obj = d[t]
                    if obj is None:
                        continue

                    def _call(obj=obj):
                        obj = obj.copy()

                        k2 = key2
                        _eq(t, o, a, obj, key1, k2)

                    # Panel deprecations
                    if isinstance(obj, Panel):
                        with catch_warnings():
                            filterwarnings("ignore", "\nPanel*", FutureWarning)
                            _call()
                    else:
                        _call()