"""
Define the SeriesGroupBy, DataFrameGroupBy, and PanelGroupBy
classes that hold the groupby interfaces (and some implementations).
These are user facing as the result of the ``df.groupby(...)`` operations,
which here returns a DataFrameGroupBy object.
"""
import collections
import copy
from functools import partial
from textwrap import dedent
import warnings
import numpy as np
from pandas._libs import Timestamp, lib
import pandas.compat as compat
from pandas.compat import lzip, map
from pandas.compat.numpy import _np_version_under1p13
from pandas.errors import AbstractMethodError
from pandas.util._decorators import Appender, Substitution
from pandas.core.dtypes.cast import maybe_downcast_to_dtype
from pandas.core.dtypes.common import (
ensure_int64, ensure_platform_int, is_bool, is_datetimelike,
is_integer_dtype, is_interval_dtype, is_numeric_dtype, is_scalar)
from pandas.core.dtypes.missing import isna, notna
import pandas.core.algorithms as algorithms
from pandas.core.arrays import Categorical
from pandas.core.base import DataError, SpecificationError
import pandas.core.common as com
from pandas.core.frame import DataFrame
from pandas.core.generic import NDFrame, _shared_docs
from pandas.core.groupby import base
from pandas.core.groupby.groupby import (
GroupBy, _apply_docs, _transform_template)
from pandas.core.index import CategoricalIndex, Index, MultiIndex
import pandas.core.indexes.base as ibase
from pandas.core.internals import BlockManager, make_block
from pandas.core.panel import Panel
from pandas.core.series import Series
from pandas.plotting._core import boxplot_frame_groupby
class NDFrameGroupBy(GroupBy):
def _iterate_slices(self):
if self.axis == 0:
# kludge
if self._selection is None:
slice_axis = self.obj.columns
else:
slice_axis = self._selection_list
slicer = lambda x: self.obj[x]
else:
slice_axis = self.obj.index
slicer = self.obj.xs
for val in slice_axis:
if val in self.exclusions:
continue
yield val, slicer(val)
def _cython_agg_general(self, how, alt=None, numeric_only=True,
min_count=-1):
new_items, new_blocks = self._cython_agg_blocks(
how, alt=alt, numeric_only=numeric_only, min_count=min_count)
return self._wrap_agged_blocks(new_items, new_blocks)
def _wrap_agged_blocks(self, items, blocks):
obj = self._obj_with_exclusions
new_axes = list(obj._data.axes)
# more kludge
if self.axis == 0:
new_axes[0], new_axes[1] = new_axes[1], self.grouper.result_index
else:
new_axes[self.axis] = self.grouper.result_index
# Make sure block manager integrity check passes.
assert new_axes[0].equals(items)
new_axes[0] = items
mgr = BlockManager(blocks, new_axes)
new_obj = type(obj)(mgr)
return self._post_process_cython_aggregate(new_obj)
_block_agg_axis = 0
def _cython_agg_blocks(self, how, alt=None, numeric_only=True,
min_count=-1):
# TODO: the actual managing of mgr_locs is a PITA
# here, it should happen via BlockManager.combine
data, agg_axis = self._get_data_to_aggregate()
if numeric_only:
data = data.get_numeric_data(copy=False)
new_blocks = []
new_items = []
deleted_items = []
for block in data.blocks:
locs = block.mgr_locs.as_array
try:
result, _ = self.grouper.aggregate(
block.values, how, axis=agg_axis, min_count=min_count)
except NotImplementedError:
# generally if we have numeric_only=False
# and non-applicable functions
# try to python agg
if alt is None:
# we cannot perform the operation
# in an alternate way, exclude the block
deleted_items.append(locs)
continue
# call our grouper again with only this block
from pandas.core.groupby.groupby import groupby
obj = self.obj[data.items[locs]]
s = groupby(obj, self.grouper)
result = s.aggregate(lambda x: alt(x, axis=self.axis))
finally:
# see if we can cast the block back to the original dtype
result = block._try_coerce_and_cast_result(result)
newb = block.make_block(result)
new_items.append(locs)
new_blocks.append(newb)
if len(new_blocks) == 0:
raise DataError('No numeric types to aggregate')
# reset the locs in the blocks to correspond to our
# current ordering
indexer = np.concatenate(new_items)
new_items = data.items.take(np.sort(indexer))
if len(deleted_items):
# we need to adjust the indexer to account for the
# items we have removed
# really should be done in internals :<
deleted = np.concatenate(deleted_items)
ai = np.arange(len(data))
mask = np.zeros(len(data))
mask[deleted] = 1
indexer = (ai - mask.cumsum())[indexer]
offset = 0
for b in new_blocks:
loc = len(b.mgr_locs)
b.mgr_locs = indexer[offset:(offset + loc)]
offset += loc
return new_items, new_blocks
def _get_data_to_aggregate(self):
obj = self._obj_with_exclusions
if self.axis == 0:
return obj.swapaxes(0, 1)._data, 1
else:
return obj._data, self.axis
def _post_process_cython_aggregate(self, obj):
# undoing kludge from below
if self.axis == 0:
obj = obj.swapaxes(0, 1)
return obj
def aggregate(self, arg, *args, **kwargs):
_level = kwargs.pop('_level', None)
result, how = self._aggregate(arg, _level=_level, *args, **kwargs)
if how is None:
return result
if result is None:
# grouper specific aggregations
if self.grouper.nkeys > 1:
return self._python_agg_general(arg, *args, **kwargs)
else:
# try to treat as if we are passing a list
try:
assert not args and not kwargs
result = self._aggregate_multiple_funcs(
[arg], _level=_level, _axis=self.axis)
result.columns = Index(
result.columns.levels[0],
name=self._selected_obj.columns.name)
except Exception:
result = self._aggregate_generic(arg, *args, **kwargs)
if not self.as_index:
self._insert_inaxis_grouper_inplace(result)
result.index = np.arange(len(result))
return result._convert(datetime=True)
agg = aggregate
def _aggregate_generic(self, func, *args, **kwargs):
if self.grouper.nkeys != 1:
raise AssertionError('Number of keys must be 1')
axis = self.axis
obj = self._obj_with_exclusions
result = {}
if axis != obj._info_axis_number:
try:
for name, data in self:
result[name] = self._try_cast(func(data, *args, **kwargs),
data)
except Exception:
return self._aggregate_item_by_item(func, *args, **kwargs)
else:
for name in self.indices:
try:
data = self.get_group(name, obj=obj)
result[name] = self._try_cast(func(data, *args, **kwargs),
data)
except Exception:
wrapper = lambda x: func(x, *args, **kwargs)
result[name] = data.apply(wrapper, axis=axis)
return self._wrap_generic_output(result, obj)
def _wrap_aggregated_output(self, output, names=None):
raise AbstractMethodError(self)
def _aggregate_item_by_item(self, func, *args, **kwargs):
# only for axis==0
obj = self._obj_with_exclusions
result = {}
cannot_agg = []
errors = None
for item in obj:
try:
data = obj[item]
colg = SeriesGroupBy(data, selection=item,
grouper=self.grouper)
result[item] = self._try_cast(
colg.aggregate(func, *args, **kwargs), data)
except ValueError:
cannot_agg.append(item)
continue
except TypeError as e:
cannot_agg.append(item)
errors = e
continue
result_columns = obj.columns
if cannot_agg:
result_columns = result_columns.drop(cannot_agg)
# GH6337
if not len(result_columns) and errors is not None:
raise errors
return DataFrame(result, columns=result_columns)
def _decide_output_index(self, output, labels):
if len(output) == len(labels):
output_keys = labels
else:
output_keys = sorted(output)
try:
output_keys.sort()
except Exception: # pragma: no cover
pass
if isinstance(labels, MultiIndex):
output_keys = MultiIndex.from_tuples(output_keys,
names=labels.names)
return output_keys
def _wrap_applied_output(self, keys, values, not_indexed_same=False):
from pandas.core.index import _all_indexes_same
from pandas.core.tools.numeric import to_numeric
if len(keys) == 0:
return DataFrame(index=keys)
key_names = self.grouper.names
# GH12824.
def first_not_none(values):
try:
return next(com._not_none(*values))
except StopIteration:
return None
v = first_not_none(values)
if v is None:
# GH9684. If all values are None, then this will throw an error.
# We'd prefer it return an empty dataframe.
return DataFrame()
elif isinstance(v, DataFrame):
return self._concat_objects(keys, values,
not_indexed_same=not_indexed_same)
elif self.grouper.groupings is not None:
if len(self.grouper.groupings) > 1:
key_index = self.grouper.result_index
else:
ping = self.grouper.groupings[0]
if len(keys) == ping.ngroups:
key_index = ping.group_index
key_index.name = key_names[0]
key_lookup = Index(keys)
indexer = key_lookup.get_indexer(key_index)
# reorder the values
values = [values[i] for i in indexer]
else:
key_index = Index(keys, name=key_names[0])
# don't use the key indexer
if not self.as_index:
key_index = None
# make Nones an empty object
v = first_not_none(values)
if v is None:
return DataFrame()
Loading ...