Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import typing as t
from sarus_differential_privacy.query import LaplaceQuery, PrivateQuery
from sarus_statistics.ops.sum.local import sum_op
from sarus_statistics.ops.utils import group_by_results_to_dataframe
import numpy as np
import pandas as pd
from sarus_data_spec.constants import PUBLIC, PU_COLUMN, WEIGHTS
from sarus_data_spec.sarus_statistics.ops.data_utils import (
compute_rescaled_data_from_dataspec,
compute_rescaled_groupby_data_from_dataspec,
)
import sarus_data_spec.typing as st
class SumOp:
def __init__(
self,
dataset: st.Dataset,
noise: float,
):
self._dataset = dataset
self.noise = noise
def dataset(self) -> st.Dataset:
return self._dataset
def value(
self,
column_name: str,
max_multiplicity: float,
bounds: t.Tuple[float, float],
random_generator: t.Optional[np.random.Generator] = None,
) -> float:
dataset = self.dataset()
if dataset.manager().is_big_data(dataset):
return self._sql(
column_name, max_multiplicity, bounds, random_generator
)
return self._pandas(
column_name, max_multiplicity, bounds, random_generator
)
def _sql(
self,
column_name: str,
max_multiplicity: float,
bounds: t.Tuple[float, float],
random_generator: t.Optional[np.random.Generator] = None,
) -> float:
raise NotImplementedError
def _pandas(
self,
column_name: str,
max_multiplicity: float,
bounds: t.Tuple[float, float],
random_generator: t.Optional[np.random.Generator] = None,
) -> float:
rescaled_data = compute_rescaled_data_from_dataspec(
self.dataset(), max_multiplicity
)
return sum_op(
rescaled_data[[column_name, PU_COLUMN, PUBLIC, WEIGHTS]],
column_name,
PU_COLUMN,
PUBLIC,
WEIGHTS,
self.noise,
bounds,
max_multiplicity,
random_generator,
is_data_already_scaled=True,
)
def private_query(self) -> PrivateQuery:
return LaplaceQuery(self.noise)
class GroupBySumOp:
def __init__(
self,
dataset: st.Dataset,
noise: float,
):
self._dataset = dataset
self.noise = noise
def dataset(self) -> st.Dataset:
return self._dataset
def value(
self,
column_name: str,
max_multiplicity: float,
bounds: t.Tuple[float, float],
random_generator: t.Optional[np.random.Generator] = None,
keys_values: t.Optional[t.List[t.Any]] = None,
) -> pd.DataFrame:
dataset = self.dataset()
if dataset.manager().is_big_data(dataset):
return self._sql(
column_name,
max_multiplicity,
bounds,
random_generator,
keys_values,
)
return self._pandas(
column_name,
max_multiplicity,
bounds,
random_generator,
keys_values,
)
def _sql(
self,
column_name: str,
max_multiplicity: float,
bounds: t.Tuple[float, float],
random_generator: t.Optional[np.random.Generator] = None,
keys_values: t.Optional[t.List[t.Any]] = None,
) -> pd.DataFrame:
raise NotImplementedError
def _pandas(
self,
column_name: str,
max_multiplicity: float,
bounds: t.Tuple[float, float],
random_generator: t.Optional[np.random.Generator] = None,
keys_values: t.Optional[t.List[t.Any]] = None,
) -> pd.DataFrame:
"""
The pandas implementation for the groupby sum operation.
We apply a dp implementation of the sum to all groups,
using a multi-dimensional Laplace mechanism.
To compute the noise to apply for each Laplace mechanism (for each
group),
we first need to calculate the L1 sensitivity (s1).
The minimum noise is given by: 1/epsilon * s1
With s0, s1, and s_infinite representing the L0, L1, and L_infinite
sensitivities of the query q, respectively,
the L1 sensitivity is bounded as follows:
s1 <= s0 * s_infinite
- s_infinite is bounded in the local implementation sum_op by
'sensibility'.
- s0 is bounded by 'max_multiplicity'.
Therefore, we apply to each group the noise
1/epsilon (=self.noise) * max_multiplicity * sensibility:
- We multiply the noise by 'max_multiplicity' in this part of the code.
- We multiply by 's_infinite' in 'sum_op'.
Note: For more information, see:
https://github.com/google/differential-privacy/
blob/main/common_docs/Differential_Privacy_Computations_In_Data_Pipelines.pdf
"""
(
rescaled_groupbydata,
keys_name,
) = compute_rescaled_groupby_data_from_dataspec(
self.dataset(), max_multiplicity
)
results = []
for key, rescaled_group_data in rescaled_groupbydata:
if keys_values is None or key in keys_values:
dp_sum = sum_op(
rescaled_group_data[
[column_name, PU_COLUMN, PUBLIC, WEIGHTS]
],
column_name,
PU_COLUMN,
PUBLIC,
WEIGHTS,
self.noise,
bounds,
max_multiplicity,
random_generator,
is_data_already_scaled=True,
)
results.append((key, dp_sum))
return group_by_results_to_dataframe(results, "Sum", keys_name)
def private_query(self) -> PrivateQuery:
return LaplaceQuery(self.noise)