Repository URL to install this package:
|
Version:
4.0.1 ▾
|
import typing as t
import numpy as np
import pandas as pd
import sqlalchemy as sa
from sqlalchemy.orm import Session
# importing sarus data spec here create a circular import
VIEW_ENABLED = False # For now, make assumptions on dataset
def rescale_weights(
data: pd.DataFrame,
user_col: str,
private_col: str,
weight_col: str,
max_multiplicity: float,
) -> pd.DataFrame:
"""Rescale data weights so each user's private contribution is at max
max_multiplicity in L1 norm"""
# Assuming check_is_private is a function that validates the data
data_copy = data.copy()
is_private = ~data_copy[private_col].astype('bool')
# Compute the sum of weights for each user using vectorized operations
user_weights_sum = (
data_copy[is_private]
.groupby(user_col, dropna=False, sort=False)[weight_col]
.transform('sum')
)
# Calculate the scaling factor for each user
scaling_factor = (max_multiplicity / user_weights_sum).clip(upper=1)
# Apply the scaling factor to the weight column
data_copy.loc[is_private, weight_col] *= scaling_factor
return data_copy
def rescale_l2_weights(
data: pd.DataFrame,
data_col: str,
user_col: str,
private_col: str,
weight_col: str,
max_multiplicity: float,
) -> pd.DataFrame:
"""Rescale data weights so each user's private contribution is at max
max_multiplicity in L2 norm"""
check_is_private(data, user_col, private_col)
data_copy = data.copy()
condition = data_copy[private_col]
user_groupby = data_copy[~condition].groupby(
[user_col, data_col], dropna=False
)
# this sums elements per pe with the same category then put it to squares
# and returns normal index
data_summed = (user_groupby[weight_col].sum() ** 2).reset_index()
user_weights = (
max_multiplicity
/ np.sqrt(data_summed.groupby(user_col)[weight_col].sum())
).clip(upper=1)
if isinstance(user_weights, pd.DataFrame):
# if single PE; this is a series do not squeeze
user_weights = user_weights.squeeze()
user_weights = user_weights.to_dict()
data_copy.loc[~condition, weight_col] = data_copy.loc[
~condition, weight_col
] * (
data_copy.loc[~condition, user_col]
.apply(lambda y: user_weights[y])
.values
)
return data_copy
def check_is_private(
data: pd.DataFrame, user_col: str, private_col: str
) -> None:
"""Check integrity of a dataset for private_col.
If private_col is True (the row is private), the user must not be None"""
if len(data[(data[user_col].isnull()) & (~data[private_col])]) > 0:
raise ValueError( # is ValueError the right choice ?
"A row in the dataset is set to private but has "
"privacy_unit = None"
)
def generator_from_seed(seed: t.Optional[int] = None) -> np.random.Generator:
"""Build a random generator from a seed"""
return np.random.default_rng(seed)
# SQL utils
def laplace_mechanism(
sqlalchemy_expr: sa.sql.ColumnElement,
noise: float,
) -> sa.sql.ColumnElement:
"""
transform an sqlalchemy_expr to add Laplace noise of parameter `noise`
"""
urn = sa.func.random()
return sqlalchemy_expr - noise * sa.func.log(
1 - 2 * sa.func.abs(urn - 0.5)
) * sa.func.sign(urn - 0.5)
def gaussian_mechanism(
sqlalchemy_expr: sa.sql.ColumnElement,
noise: float,
) -> sa.sql.ColumnElement:
"""
transform an sqlalchemy_expr to add Gaussian noise of parameter `noise`
"""
urn = sa.func.random()
return sqlalchemy_expr + noise * sa.func.sqrt(
-2 * sa.func.log(urn)
) * sa.func.cos(2 * sa.func.acos(-1.0) * urn)
def rescale_weights_sql(
session: Session,
table: sa.sql.FromClause,
data_col: str,
user_col: str,
private_col: str,
weight_col: str,
max_multiplicity: float,
) -> sa.sql.Alias:
"""Rescale data weights so each user's private contribution is at max
max_multiplicity in L1 norm
For now, we make the supposition that max_multiplicity is always 1 in big
data, so no need to clip.
"""
if not VIEW_ENABLED:
return t.cast(
sa.sql.Alias,
session.query(
*[col.label(key) for key, col in table.c.items()]
).subquery(),
)
user_groupby = (
session.query(
getattr(table.c, user_col),
sa.func.sum(getattr(table.c, weight_col)).label(weight_col),
)
.where(getattr(table.c, private_col).is_(False))
.group_by(getattr(table.c, user_col))
.subquery()
)
user_weights = session.query(
getattr(user_groupby.c, user_col),
sa.func.least(
1.0,
sa.cast(max_multiplicity, sa.Float())
/ getattr(user_groupby.c, weight_col),
),
).subquery()
scaled_private_data = (
session.query(
getattr(table.c, data_col).label(data_col),
getattr(table.c, user_col),
getattr(table.c, private_col),
getattr(table.c, weight_col) * user_weights.c.least,
)
.where(getattr(table.c, private_col).is_(False))
.join(
user_weights,
getattr(table.c, user_col) == getattr(user_weights.c, user_col),
)
)
scaled_private_data = (
session.query(
getattr(table.c, data_col).label(data_col),
getattr(table.c, user_col).label(user_col),
getattr(table.c, private_col).label(private_col),
(getattr(table.c, weight_col) * user_weights.c.least).label(
weight_col
),
)
.where(getattr(table.c, private_col))
.union_all(scaled_private_data)
)
return t.cast(sa.sql.Alias, scaled_private_data.subquery())
def group_by_results_to_dataframe(
results: t.List, name_col: str, keys_name: t.Union[t.List[t.Any], t.Any]
) -> pd.DataFrame:
if len(results) > 0 and isinstance(results[0][0], tuple):
expanded_results = [(*key, dp_result) for key, dp_result in results]
else:
expanded_results = list(results)
if isinstance(keys_name, list):
columns = keys_name + [name_col]
else:
columns = [keys_name, name_col]
result_df = pd.DataFrame(expanded_results, columns=columns)
result_df.set_index(columns[:-1], inplace=True)
return result_df