Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
sarus_xgboost / sarus_xgboost / visitor.py
Size: Mime:
from __future__ import annotations

import typing

from sklearn.preprocessing import OrdinalEncoder
import numpy as np
import pandas as pd
import sarus_data_spec.typing as sdt


# pylint: disable=redefined-builtin, unused-argument, too-many-statements
def parse_xgboost(
    _type: sdt.Type,
    data: pd.Series,
    stat: sdt.Statistics,
    normalize: bool = False,
) -> typing.Tuple[typing.List, typing.List, pd.Series]:
    """Given a dataframe, set everything up:
    - encode categoricals
    - clip values to bounds
    - normalize to [0, 1] if normalize is True (for labels)

    Returns the list of min and max of each value and the new dataframe"""

    class ParseXGBoost(sdt.TypeVisitor):
        """DefaultBounds visitor"""

        features_min: typing.List[float] = []
        features_max: typing.List[float] = []
        data: pd.Series = pd.Series()

        def Null(self) -> None:
            self.features_min.append(0)
            self.features_max.append(0)
            self.data = data

        def Unit(self) -> None:
            self.features_min.append(0)
            self.features_max.append(0)
            self.data = data

        def Boolean(self) -> None:
            self.features_min.append(0)
            self.features_max.append(1)

            # we encode categoricals because DMatrix doesn't work otherwise
            points = stat.protobuf().boolean.distribution.boolean.points
            enc = OrdinalEncoder().fit(
                np.array([point.name for point in points]).reshape(-1, 1)
            )
            self.data = pd.Series(
                enc.transform(data.values.reshape(-1, 1)).squeeze(),
                index=data.index,
            )

        def Id(
            self,
            unique: bool,
            reference: typing.Optional[sdt.Path] = None,
            base: typing.Optional[sdt.IdBase] = None,
        ) -> None:
            # as categorical ?
            self.features_min.append(0)
            self.features_max.append(0)
            self.data = data

        def Integer(self, min: int, max: int, base: sdt.IntegerBase) -> None:
            self.features_min.append(
                stat.protobuf().integer.distribution.integer.min
            )
            self.features_max.append(
                stat.protobuf().integer.distribution.integer.max
            )
            self.data = data.clip(
                stat.protobuf().integer.distribution.integer.min,
                stat.protobuf().integer.distribution.integer.max,
            )
            if normalize:
                self.data = (
                    self.data
                    - stat.protobuf().integer.distribution.integer.min
                ) / stat.protobuf().integer.distribution.integer.max

        def Enum(
            self,
            name: str,
            name_values: typing.Sequence[typing.Tuple[str, int]],
            ordered: bool,
        ) -> None:
            self.features_min.append(0)
            self.features_max.append(1)

            # we encode categoricals because DMatrix doesn't work otherwise
            points = stat.protobuf().enum.distribution.enum.points
            enc = OrdinalEncoder().fit(
                np.array([point.name for point in points]).reshape(-1, 1)
            )
            self.data = pd.Series(
                enc.transform(data.values.reshape(-1, 1)).squeeze(),
                index=data.index,
            )
            if normalize:
                self.data /= (
                    len(stat.protobuf().enum.distribution.enum.points) - 1
                )

        def Float(self, min: float, max: float, base: sdt.FloatBase) -> None:
            distrib = stat.protobuf().float.distribution.double
            min32 = (
                np.finfo(np.float32).min
                if distrib.min < np.finfo(np.float32).min
                else distrib.min
            )
            max32 = (
                np.finfo(np.float32).max
                if distrib.max < np.finfo(np.float32).max
                else distrib.max
            )
            self.features_min.append(min32)
            self.features_max.append(max32)
            self.data = data.clip(distrib.min, max32)
            if normalize:
                self.data = (self.data - min32) / max32

        def Text(self, encoding: str) -> None:
            raise NotImplementedError

        def Bytes(self) -> None:
            raise NotImplementedError

        def Struct(
            self,
            fields: typing.Mapping[str, sdt.Type],
            name: typing.Optional[str] = None,
        ) -> None:
            self.data = pd.DataFrame()
            print('FIELDS', fields)
            for key, value in fields.items():
                print(key, data.columns, stat.children().keys())
                if key not in data.columns:
                    continue
                f_min, f_max, subdata = parse_xgboost(
                    value,
                    data[key],
                    [
                        statistics
                        for name, statistics in stat.children().items()
                        if name == key
                    ][0],
                )
                self.features_min += f_min
                self.features_max += f_max
                self.data[key] = subdata

        def Constrained(
            self,
            type: sdt.Type,
            constraint: sdt.Predicate,
            name: typing.Optional[str] = None,
        ) -> None:
            raise NotImplementedError

        def List(
            self,
            type: sdt.Type,
            max_size: int,
            name: typing.Optional[str] = None,
        ) -> None:
            raise NotImplementedError

        def Array(
            self,
            type: sdt.Type,
            shape: typing.Tuple[int, ...],
            name: typing.Optional[str] = None,
        ) -> None:
            raise NotImplementedError

        def Optional(
            self, type: sdt.Type, name: typing.Optional[str] = None
        ) -> None:
            f_min, f_max, subdata = parse_xgboost(
                type, data[data.notnull()], list(stat.children().values())[0]
            )
            self.features_min += f_min
            self.features_max += f_max
            data[data.notnull()] = subdata
            self.data = data

        def Union(
            self,
            fields: typing.Mapping[str, sdt.Type],
            name: typing.Optional[str] = None,
        ) -> None:
            raise NotImplementedError

        def Datetime(
            self, format: str, min: str, max: str, base: sdt.DatetimeBase
        ) -> None:
            self.features_min.append(
                stat.protobuf().datetime.distribution.integer.min
            )
            self.features_max.append(
                stat.protobuf().datetime.distribution.integer.max
            )
            self.data = data.clip(
                stat.protobuf().datetime.distribution.integer.min,
                stat.protobuf().datetime.distribution.integer.max,
            )
            if normalize:
                self.data = (
                    self.data
                    - stat.protobuf().datetime.distribution.integer.min
                ) / stat.protobuf().datetime.distribution.integer.max

    visitor = ParseXGBoost()
    _type.accept(visitor)

    return visitor.features_min, visitor.features_max, visitor.data