Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# pylint: disable=fixme

from math import exp, log
from typing import Optional, Tuple, cast

from sarus_data_spec.typing import Dataset
from sarus_differential_privacy import query  # type: ignore
import dp_xgboost as xgb
import pandas as pd
import sarus_data_spec.constants as const

from sarus_xgboost.protobuf.xgboost_pb2 import XgboostParameters
from sarus_xgboost.visitor import parse_xgboost


# pylint: disable=too-many-locals
def xgboost(
    features_dataset: Dataset,
    label_dataset: Dataset,
    parameters: XgboostParameters,
) -> Tuple[xgb.Booster, query.PrivateQuery]:
    """Train a DP-XGBoost model on features_dataset given labels and
    parameters.
    Categories are encoded by their values and labels are normalized to [0, 1]
    """

    # check dataset format
    table_path = features_dataset.schema().tables()[0]
    features_schema = (
        features_dataset.schema().data_type().sub_types(table_path)[0]
    )
    label_schema = label_dataset.schema().data_type().sub_types(table_path)[0]
    if len(features_dataset.schema().tables()) != 1:
        raise ValueError(
            'This dataset is composed of several tables, which is not '
            'supported'
        )

    # retrieve id columns
    ids = [
        name
        for name, type in features_schema.children().items()
        if (
            type.protobuf().WhichOneof('type') == 'id'
            and type.protobuf().id.unique
            and not type.protobuf().id.reference.label
        )
    ]
    if len(ids) == 1:
        index: Optional[str] = ids[0]
    else:
        index = None

    # dataset to pandas
    features = features_dataset.to_pandas()
    label = label_dataset.to_pandas()
    for path_node, table_index in zip(
        table_path.to_strings_list()[0],
        [None] * (len(table_path.to_strings_list()[0]) - 1)
        + [index],  # type: ignore
    ):
        features = pd.DataFrame.from_records(
            features[path_node],
            index=table_index,
        )
        label = pd.DataFrame.from_records(
            label[path_node],
            index=table_index,
        )

    # get bounds / encode enums
    stat = (
        features_dataset.marginals()
        .statistics()
        .nodes_statistics(table_path)[0]
    )
    feature_min, feature_max, features = parse_xgboost(
        features_schema, features, stat
    )

    stat = (
        label_dataset.marginals().statistics().nodes_statistics(table_path)[0]
    )
    _, _, label = parse_xgboost(label_schema, label, stat, normalize=True)

    # bound user contribution
    weights = rescale_weights(
        features_dataset.to_pandas(),
        user_col=const.USER_COLUMN,
        weight_col=const.WEIGHTS,
        max_multiplicity=features_dataset.marginals()
        .protobuf()
        .statistics.struct.multiplicity,
    )[const.WEIGHTS]

    # enable_categorical doesn't work with optional categories
    dtrain = xgb.DMatrix(
        features.astype(float),
        label=label.astype(float),
        feature_min=feature_min,
        feature_max=feature_max,
        weight=weights,
    )

    params = {
        'objective': parameters.objective,
        'tree_method': parameters.tree_method,
        'dp_epsilon_per_tree': parameters.dp_epsilon_per_tree,
        'max_depth': parameters.max_depth,
        'learning_rate': parameters.learning_rate,
        'lambda': parameters.lambd,
        'base_score': parameters.base_score,
        'subsample': parameters.subsample,
        'min_child_weight': parameters.min_child_weight,
        'nthread': parameters.nthread,
    }

    bst = cast(
        xgb.Booster,
        xgb.train(  # type: ignore[no-untyped-call]
            params, dtrain, num_boost_round=parameters.num_boost_rounds
        ),
    )

    return bst, query.EpsilonQuery(
        epsilon=parameters.num_boost_rounds
        * log(
            1
            + parameters.subsample * (exp(parameters.dp_epsilon_per_tree) - 1)
        )
    )  # TODO: Actual privacy consumption, not EpsilonQuery


def rescale_weights(
    data: pd.DataFrame,
    user_col: str,
    weight_col: str,
    max_multiplicity: float,
) -> pd.DataFrame:
    """Rescale data weights so each user's private contribution is at max
    max_sensitivity"""
    data_copy = data.copy()
    user_groupby = data_copy[data_copy[user_col].notnull()].groupby(
        [user_col], dropna=False
    )
    user_weights = (
        (max_multiplicity / (user_groupby[weight_col]).sum())
        .clip(upper=1)
        .to_dict()
    )
    data_copy.loc[data_copy[user_col].notnull(), weight_col] = data_copy.loc[
        data_copy[user_col].notnull(), weight_col
    ] * (
        data_copy.loc[data_copy[user_col].notnull(), user_col]
        .apply(lambda y: user_weights[y])
        .values
    )
    return data_copy