Repository URL to install this package:
|
Version:
0.1.3 ▾
|
# pylint: disable=fixme
from math import exp, log
from typing import Optional, Tuple, cast
from sarus_data_spec.typing import Dataset
from sarus_differential_privacy import query # type: ignore
import dp_xgboost as xgb
import pandas as pd
import sarus_data_spec.constants as const
from sarus_xgboost.protobuf.xgboost_pb2 import XgboostParameters
from sarus_xgboost.visitor import parse_xgboost
# pylint: disable=too-many-locals
def xgboost(
features_dataset: Dataset,
label_dataset: Dataset,
parameters: XgboostParameters,
) -> Tuple[xgb.Booster, query.PrivateQuery]:
"""Train a DP-XGBoost model on features_dataset given labels and
parameters.
Categories are encoded by their values and labels are normalized to [0, 1]
"""
# check dataset format
table_path = features_dataset.schema().tables()[0]
features_schema = (
features_dataset.schema().data_type().sub_types(table_path)[0]
)
label_schema = label_dataset.schema().data_type().sub_types(table_path)[0]
if len(features_dataset.schema().tables()) != 1:
raise ValueError(
'This dataset is composed of several tables, which is not '
'supported'
)
# retrieve id columns
ids = [
name
for name, type in features_schema.children().items()
if (
type.protobuf().WhichOneof('type') == 'id'
and type.protobuf().id.unique
and not type.protobuf().id.reference.label
)
]
if len(ids) == 1:
index: Optional[str] = ids[0]
else:
index = None
# dataset to pandas
features = features_dataset.to_pandas()
label = label_dataset.to_pandas()
for path_node, table_index in zip(
table_path.to_strings_list()[0],
[None] * (len(table_path.to_strings_list()[0]) - 1)
+ [index], # type: ignore
):
features = pd.DataFrame.from_records(
features[path_node],
index=table_index,
)
label = pd.DataFrame.from_records(
label[path_node],
index=table_index,
)
# get bounds / encode enums
stat = (
features_dataset.marginals()
.statistics()
.nodes_statistics(table_path)[0]
)
feature_min, feature_max, features = parse_xgboost(
features_schema, features, stat
)
stat = (
label_dataset.marginals().statistics().nodes_statistics(table_path)[0]
)
_, _, label = parse_xgboost(label_schema, label, stat, normalize=True)
# bound user contribution
weights = rescale_weights(
features_dataset.to_pandas(),
user_col=const.USER_COLUMN,
weight_col=const.WEIGHTS,
max_multiplicity=features_dataset.marginals()
.protobuf()
.statistics.struct.multiplicity,
)[const.WEIGHTS]
# enable_categorical doesn't work with optional categories
dtrain = xgb.DMatrix(
features.astype(float),
label=label.astype(float),
feature_min=feature_min,
feature_max=feature_max,
weight=weights,
)
params = {
'objective': parameters.objective,
'tree_method': parameters.tree_method,
'dp_epsilon_per_tree': parameters.dp_epsilon_per_tree,
'max_depth': parameters.max_depth,
'learning_rate': parameters.learning_rate,
'lambda': parameters.lambd,
'base_score': parameters.base_score,
'subsample': parameters.subsample,
'min_child_weight': parameters.min_child_weight,
'nthread': parameters.nthread,
}
bst = cast(
xgb.Booster,
xgb.train( # type: ignore[no-untyped-call]
params, dtrain, num_boost_round=parameters.num_boost_rounds
),
)
return bst, query.EpsilonQuery(
epsilon=parameters.num_boost_rounds
* log(
1
+ parameters.subsample * (exp(parameters.dp_epsilon_per_tree) - 1)
)
) # TODO: Actual privacy consumption, not EpsilonQuery
def rescale_weights(
data: pd.DataFrame,
user_col: str,
weight_col: str,
max_multiplicity: float,
) -> pd.DataFrame:
"""Rescale data weights so each user's private contribution is at max
max_sensitivity"""
data_copy = data.copy()
user_groupby = data_copy[data_copy[user_col].notnull()].groupby(
[user_col], dropna=False
)
user_weights = (
(max_multiplicity / (user_groupby[weight_col]).sum())
.clip(upper=1)
.to_dict()
)
data_copy.loc[data_copy[user_col].notnull(), weight_col] = data_copy.loc[
data_copy[user_col].notnull(), weight_col
] * (
data_copy.loc[data_copy[user_col].notnull(), user_col]
.apply(lambda y: user_weights[y])
.values
)
return data_copy