Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
modin / core / dataframe / algebra / default2pandas / groupby.py
Size: Mime:
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership.  The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""Module houses default GroupBy functions builder class."""

from .default import DefaultMethod

import pandas
from pandas.core.dtypes.common import is_list_like


# FIXME: there is no sence of keeping `GroupBy` and `GroupByDefault` logic in a different
# classes. They should be combined.
class GroupBy:
    """Builder for GroupBy aggregation functions."""

    agg_aliases = [
        "agg",
        "dict_agg",
        pandas.core.groupby.DataFrameGroupBy.agg,
        pandas.core.groupby.DataFrameGroupBy.aggregate,
    ]

    @classmethod
    def validate_by(cls, by):
        """
        Build valid `by` parameter for `pandas.DataFrame.groupby`.

        Cast all DataFrames in `by` parameter to Series or list of Series in case
        of multi-column frame.

        Parameters
        ----------
        by : DateFrame, Series, index label or list of such
            Object which indicates groups for GroupBy.

        Returns
        -------
        Series, index label or list of such
            By parameter with all DataFrames casted to Series.
        """

        def try_cast_series(df):
            """Cast one-column frame to Series."""
            if isinstance(df, pandas.DataFrame):
                df = df.squeeze(axis=1)
            if not isinstance(df, pandas.Series):
                return df
            if df.name == "__reduced__":
                df.name = None
            return df

        if isinstance(by, pandas.DataFrame):
            by = [try_cast_series(column) for _, column in by.items()]
        elif isinstance(by, pandas.Series):
            by = [try_cast_series(by)]
        elif isinstance(by, list):
            by = [try_cast_series(o) for o in by]
        return by

    @classmethod
    def inplace_applyier_builder(cls, key, func=None):
        """
        Bind actual aggregation function to the GroupBy aggregation method.

        Parameters
        ----------
        key : callable
            Function that takes GroupBy object and evaluates passed aggregation function.
        func : callable or str, optional
            Function that takes DataFrame and aggregate its data. Will be applied
            to each group at the grouped frame.

        Returns
        -------
        callable,
            Function that executes aggregation under GroupBy object.
        """
        inplace_args = [] if func is None else [func]

        def inplace_applyier(grp, *func_args, **func_kwargs):
            return key(grp, *inplace_args, *func_args, **func_kwargs)

        return inplace_applyier

    @classmethod
    def get_func(cls, key, **kwargs):
        """
        Extract aggregation function from groupby arguments.

        Parameters
        ----------
        key : callable or str
            Default aggregation function. If aggregation function is not specified
            via groupby arguments, then `key` function is used.
        **kwargs : dict
            GroupBy arguments that may contain aggregation function.

        Returns
        -------
        callable
            Aggregation function.

        Notes
        -----
        There are two ways of how groupby aggregation can be invoked:
            1. Explicitly with query compiler method: `qc.groupby_sum()`.
            2. By passing aggregation function as an argument: `qc.groupby_agg("sum")`.
        Both are going to produce the same result, however in the first case actual aggregation
        function can be extracted from the method name, while for the second only from the method arguments.
        """
        if "agg_func" in kwargs:
            return cls.inplace_applyier_builder(key, kwargs["agg_func"])
        elif "func_dict" in kwargs:
            return cls.inplace_applyier_builder(key, kwargs["func_dict"])
        else:
            return cls.inplace_applyier_builder(key)

    @classmethod
    def build_aggregate_method(cls, key):
        """
        Build function for `QueryCompiler.groupby_agg` that can be executed as default-to-pandas.

        Parameters
        ----------
        key : callable or str
            Default aggregation function. If aggregation function is not specified
            via groupby arguments, then `key` function is used.

        Returns
        -------
        callable
            Function that executes groupby aggregation.
        """

        def fn(
            df,
            by,
            axis,
            groupby_kwargs,
            agg_args,
            agg_kwargs,
            drop=False,
            **kwargs,
        ):
            """Group DataFrame and apply aggregation function to each group."""
            by = cls.validate_by(by)

            grp = df.groupby(by, axis=axis, **groupby_kwargs)
            agg_func = cls.get_func(key, **kwargs)
            result = agg_func(grp, *agg_args, **agg_kwargs)

            return result

        return fn

    @classmethod
    def build_groupby_reduce_method(cls, agg_func):
        """
        Build function for `QueryCompiler.groupby_*` that can be executed as default-to-pandas.

        Parameters
        ----------
        agg_func : callable or str
            Default aggregation function. If aggregation function is not specified
            via groupby arguments, then `agg_func` function is used.

        Returns
        -------
        callable
            Function that executes groupby aggregation.
        """

        def fn(
            df, by, axis, groupby_kwargs, agg_args, agg_kwargs, drop=False, **kwargs
        ):
            """Group DataFrame and apply aggregation function to each group."""
            if not isinstance(by, (pandas.Series, pandas.DataFrame)):
                by = cls.validate_by(by)
                grp = df.groupby(by, axis=axis, **groupby_kwargs)
                grp_agg_func = cls.get_func(agg_func, **kwargs)
                return grp_agg_func(
                    grp,
                    *agg_args,
                    **agg_kwargs,
                )

            if isinstance(by, pandas.DataFrame):
                by = by.squeeze(axis=1)
            if (
                drop
                and isinstance(by, pandas.Series)
                and by.name in df
                and df[by.name].equals(by)
            ):
                by = by.name
            if isinstance(by, pandas.DataFrame):
                df = pandas.concat([df] + [by[[o for o in by if o not in df]]], axis=1)
                by = list(by.columns)

            groupby_kwargs = groupby_kwargs.copy()
            as_index = groupby_kwargs.pop("as_index", True)
            groupby_kwargs["as_index"] = True

            grp = df.groupby(by, axis=axis, **groupby_kwargs)
            func = cls.get_func(agg_func, **kwargs)
            result = func(grp, *agg_args, **agg_kwargs)

            if isinstance(result, pandas.Series):
                result = result.to_frame()

            if not as_index:
                method = kwargs.get("method")
                if isinstance(by, pandas.Series):
                    # 1. If `drop` is True then 'by' Series represents a column from the
                    #    source frame and so the 'by' is internal.
                    # 2. If method is 'size' then any 'by' is considered to be internal.
                    #    This is a hacky legacy from the ``groupby_size`` implementation:
                    #    https://github.com/modin-project/modin/issues/3739
                    internal_by = (by.name,) if drop or method == "size" else tuple()
                else:
                    internal_by = by

                cls.handle_as_index_for_dataframe(
                    result,
                    internal_by,
                    by_cols_dtypes=(
                        df.index.dtypes.values
                        if isinstance(df.index, pandas.MultiIndex)
                        else (df.index.dtype,)
                    ),
                    by_length=len(by),
                    drop=drop,
                    method=method,
                    inplace=True,
                )

            if result.index.name == "__reduced__":
                result.index.name = None

            return result

        return fn

    @classmethod
    def is_aggregate(cls, key):  # noqa: PR01
        """Check whether `key` is an alias for pandas.GroupBy.aggregation method."""
        return key in cls.agg_aliases

    @classmethod
    def build_groupby(cls, func):
        """
        Build function that groups DataFrame and applies aggregation function to the every group.

        Parameters
        ----------
        func : callable or str
            Default aggregation function. If aggregation function is not specified
            via groupby arguments, then `func` function is used.

        Returns
        -------
        callable
            Function that takes pandas DataFrame and does GroupBy aggregation.
        """
        if cls.is_aggregate(func):
            return cls.build_aggregate_method(func)
        return cls.build_groupby_reduce_method(func)

    @staticmethod
    def handle_as_index_for_dataframe(
        result,
        internal_by_cols,
        by_cols_dtypes=None,
        by_length=None,
        selection=None,
        partition_idx=0,
        drop=True,
        method=None,
        inplace=False,
    ):
        """
        Handle `as_index=False` parameter for the passed GroupBy aggregation result.

        Parameters
        ----------
        result : DataFrame
            Frame containing GroupBy aggregation result computed with `as_index=True`
            parameter (group names are located at the frame's index).
        internal_by_cols : list-like
            Internal 'by' columns.
        by_cols_dtypes : list-like, optional
            Data types of the internal 'by' columns. Required to do special casing
            in case of categorical 'by'. If not specified, assume that there is no
            categorical data in 'by'.
        by_length : int, optional
            Amount of keys to group on (including frame columns and external objects like list, Series, etc.)
            If not specified, consider `by_length` to be equal ``len(internal_by_cols)``.
        selection : label or list of labels, optional
            Set of columns that were explicitly selected for aggregation (for example
            via dict-aggregation). If not specified assuming that aggregation was
            applied to all of the available columns.
        partition_idx : int, default: 0
            Positional index of the current partition.
        drop : bool, default: True
            Indicates whether or not any of the `by` data came from the same frame.
        method : str, optional
            Name of the groupby function. This is a hint to be able to do special casing.
            Note: this parameter is a legacy from the ``groupby_size`` implementation,
            it's a hacky one and probably will be removed in the future: https://github.com/modin-project/modin/issues/3739.
        inplace : bool, default: False
            Modify the DataFrame in place (do not create a new object).

        Returns
        -------
        DataFrame
            GroupBy aggregation result with the considered `as_index=False` parameter.
        """
        if not inplace:
            result = result.copy()

        reset_index, drop, lvls_to_drop, cols_to_drop = GroupBy.handle_as_index(
            result_cols=result.columns,
            result_index_names=result.index.names,
            internal_by_cols=internal_by_cols,
            by_cols_dtypes=by_cols_dtypes,
            by_length=by_length,
            selection=selection,
            partition_idx=partition_idx,
            drop=drop,
            method=method,
        )

        if len(lvls_to_drop) > 0:
            result.index = result.index.droplevel(lvls_to_drop)
        if len(cols_to_drop) > 0:
            result.drop(columns=cols_to_drop, inplace=True)
        if reset_index:
            result.reset_index(drop=drop, inplace=True)
        return result

    @staticmethod
    def handle_as_index(
        result_cols,
        result_index_names,
        internal_by_cols,
        by_cols_dtypes=None,
        by_length=None,
        selection=None,
        partition_idx=0,
        drop=True,
        method=None,
    ):
        """
        Compute hints to process ``as_index=False`` parameter for the GroupBy result.

        This function resolves naming conflicts of the index levels to insert and the column labels
        for the GroupBy result. The logic of this function assumes that the initial GroupBy result
        was computed as ``as_index=True``.

        Parameters
        ----------
        result_cols : pandas.Index
            Columns of the GroupBy result.
        result_index_names : list-like
            Index names of the GroupBy result.
        internal_by_cols : list-like
            Internal 'by' columns.
        by_cols_dtypes : list-like, optional
            Data types of the internal 'by' columns. Required to do special casing
            in case of categorical 'by'. If not specified, assume that there is no
            categorical data in 'by'.
        by_length : int, optional
            Amount of keys to group on (including frame columns and external objects like list, Series, etc.)
            If not specified, consider `by_length` to be equal ``len(internal_by_cols)``.
        selection : label or list of labels, optional
            Set of columns that were explicitly selected for aggregation (for example
            via dict-aggregation). If not specified assuming that aggregation was
            applied to all of the available columns.
        partition_idx : int, default: 0
            Positional index of the current partition.
        drop : bool, default: True
            Indicates whether or not any of the `by` data came from the same frame.
        method : str, optional
            Name of the groupby function. This is a hint to be able to do special casing.
            Note: this parameter is a legacy from the ``groupby_size`` implementation,
            it's a hacky one and probably will be removed in the future: https://github.com/modin-project/modin/issues/3739.

        Returns
        -------
        reset_index : bool
            Indicates whether to reset index to the default one (0, 1, 2 ... n) at this partition.
        drop_index : bool
            If `reset_index` is True, indicates whether to drop all index levels (True) or insert them into the
            resulting columns (False).
        lvls_to_drop : list of ints
            Contains numeric indices of the levels of the result index to drop as intersected.
        cols_to_drop : list of labels
            Contains labels of the columns to drop from the result as intersected.

        Examples
        --------
        >>> groupby_result = compute_groupby_without_processing_as_index_parameter()
        >>> if not as_index:
        >>>     reset_index, drop, lvls_to_drop, cols_to_drop = handle_as_index(**extract_required_params(groupby_result))
        >>>     if len(lvls_to_drop) > 0:
        >>>         groupby_result.index = groupby_result.index.droplevel(lvls_to_drop)
        >>>     if len(cols_to_drop) > 0:
        >>>         groupby_result = groupby_result.drop(columns=cols_to_drop)
        >>>     if reset_index:
        >>>         groupby_result_with_processed_as_index_parameter = groupby_result.reset_index(drop=drop)
        >>> else:
        >>>     groupby_result_with_processed_as_index_parameter = groupby_result
        """
        if by_length is None:
            by_length = len(internal_by_cols)

        reset_index = by_length > 0 or selection is not None

        # If the method is "size" then the result contains only one unique named column
        # and we don't have to worry about any naming conflicts, so inserting all of
        # the "by" into the result (just a fast-path)
        if method == "size":
            return reset_index, False, [], []

        # Pandas logic of resolving naming conflicts is the following:
        #   1. If any categorical is in 'by' and 'by' is multi-column, then the categorical
        #      index is prioritized: drop intersected columns and insert all of the 'by' index
        #      levels to the frame as columns.
        #   2. Otherwise, aggregation result is prioritized: drop intersected index levels and
        #      insert the filtered ones to the frame as columns.
        if by_cols_dtypes is not None:
            keep_index_levels = (
                by_length > 1
                and selection is None
                and any(isinstance(x, pandas.CategoricalDtype) for x in by_cols_dtypes)
            )
        else:
            keep_index_levels = False

        # 1. We insert 'by'-columns to the result at the beginning of the frame and so only to the
        #    first partition, if partition_idx != 0 we just drop the index. If there are no columns
        #    that are required to drop (keep_index_levels is True) then we can exit here.
        # 2. We don't insert 'by'-columns to the result if 'by'-data came from a different
        #    frame (drop is False), there's only one exception for this rule: if the `method` is "size",
        #    so if (drop is False) and method is not "size" we just drop the index and so can exit here.
        if (not keep_index_levels and partition_idx != 0) or (
            not drop and method != "size"
        ):
            return reset_index, True, [], []

        if not isinstance(internal_by_cols, pandas.Index):
            if not is_list_like(internal_by_cols):
                internal_by_cols = [internal_by_cols]
            internal_by_cols = pandas.Index(internal_by_cols)

        internal_by_cols = (
            internal_by_cols[~internal_by_cols.str.startswith("__reduced__", na=False)]
            if hasattr(internal_by_cols, "str")
            else internal_by_cols
        )

        if selection is not None and not isinstance(selection, pandas.Index):
            selection = pandas.Index(selection)

        lvls_to_drop = []
        cols_to_drop = []

        if not keep_index_levels:
            # We want to insert only these internal-by-cols that are not presented
            # in the result in order to not create naming conflicts
            if selection is None:
                cols_to_insert = frozenset(internal_by_cols) - frozenset(result_cols)
            else:
                cols_to_insert = frozenset(
                    # We have to use explicit 'not in' check and not just difference
                    # of sets because of specific '__contains__' operator in case of
                    # scalar 'col' and MultiIndex 'selection'.
                    col
                    for col in internal_by_cols
                    if col not in selection
                )
        else:
            cols_to_insert = internal_by_cols
            # We want to drop such internal-by-cols that are presented
            # in the result in order to not create naming conflicts
            cols_to_drop = frozenset(internal_by_cols) & frozenset(result_cols)

        if partition_idx == 0:
            lvls_to_drop = [
                i
                for i, name in enumerate(result_index_names)
                if name not in cols_to_insert
            ]
        else:
            lvls_to_drop = result_index_names

        drop = False
        if len(lvls_to_drop) == len(result_index_names):
            drop = True
            lvls_to_drop = []

        return reset_index, drop, lvls_to_drop, cols_to_drop


class GroupByDefault(DefaultMethod):
    """Builder for default-to-pandas GroupBy aggregation functions."""

    OBJECT_TYPE = "GroupBy"

    @classmethod
    def register(cls, func, **kwargs):
        """
        Build default-to-pandas GroupBy aggregation function.

        Parameters
        ----------
        func : callable or str
            Default aggregation function. If aggregation function is not specified
            via groupby arguments, then `func` function is used.
        **kwargs : kwargs
            Additional arguments that will be passed to function builder.

        Returns
        -------
        callable
            Functiom that takes query compiler and defaults to pandas to do GroupBy
            aggregation.
        """
        return cls.call(GroupBy.build_groupby(func), fn_name=func.__name__, **kwargs)

    # This specifies a `pandas.DataFrameGroupBy` method to pass the `agg_func` to,
    # it's based on `how` to apply it. Going by pandas documentation:
    #   1. `.aggregate(func)` applies func row/column wise.
    #   2. `.apply(func)` applies func to a DataFrames, holding a whole group (group-wise).
    #   3. `.transform(func)` is the same as `.apply()` but also broadcast the `func`
    #      result to the group's original shape.
    __aggregation_methods_dict = {
        "axis_wise": pandas.core.groupby.DataFrameGroupBy.aggregate,
        "group_wise": pandas.core.groupby.DataFrameGroupBy.apply,
        "transform": pandas.core.groupby.DataFrameGroupBy.transform,
    }

    @classmethod
    def get_aggregation_method(cls, how):
        """
        Return `pandas.DataFrameGroupBy` method that implements the passed `how` UDF applying strategy.

        Parameters
        ----------
        how : {"axis_wise", "group_wise", "transform"}
            `how` parameter of the ``BaseQueryCompiler.groupby_agg``.

        Returns
        -------
        callable(pandas.DataFrameGroupBy, callable, *args, **kwargs) -> [pandas.DataFrame | pandas.Series]

        Notes
        -----
        Visit ``BaseQueryCompiler.groupby_agg`` doc-string for more information about `how` parameter.
        """
        return cls.__aggregation_methods_dict[how]