Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import typing as t
from sarus_differential_privacy.query import (
ComposedQuery,
LaplaceQuery,
PrivateQuery,
)
from sarus_statistics.ops.histograms.local import (
dataset_length,
private_histogram,
)
from sarus_statistics.ops.utils import group_by_results_to_dataframe
import numpy as np
import pandas as pd
from sarus_data_spec.constants import DATA, PUBLIC, PU_COLUMN, WEIGHTS
from sarus_data_spec.sarus_statistics.ops.data_utils import (
compute_full_data_from_dataspec,
compute_groupby_data_from_dataspec,
)
import sarus_data_spec.typing as st
class CountOp:
def __init__(
self,
dataset: st.Dataset,
noise: float,
):
self._dataset = dataset
self.noise = noise
def dataset(self) -> st.Dataset:
return self._dataset
def value(
self,
column_name: t.Optional[str],
max_multiplicity: float,
random_generator: t.Optional[np.random.Generator] = None,
) -> float:
dataset = self.dataset()
if dataset.manager().is_big_data(dataset):
return self._sql(column_name, max_multiplicity, random_generator)
return self._pandas(column_name, max_multiplicity, random_generator)
def _sql(
self,
column_name: t.Optional[str],
max_multiplicity: float,
random_generator: t.Optional[np.random.Generator] = None,
) -> float:
raise NotImplementedError
def _pandas(
self,
column_name: t.Optional[str],
max_multiplicity: float,
random_generator: t.Optional[np.random.Generator] = None,
) -> float:
if not column_name:
data = t.cast(pd.DataFrame, self.dataset().to_pandas())[
[PU_COLUMN, PUBLIC, WEIGHTS]
]
else:
data = compute_full_data_from_dataspec(self.dataset())
data = data[[column_name, PU_COLUMN, PUBLIC, WEIGHTS]].dropna(
subset=[column_name]
)
return dataset_length(
data,
PU_COLUMN,
PUBLIC,
WEIGHTS,
self.noise,
max_multiplicity,
random_generator,
)
def private_query(self) -> PrivateQuery:
return LaplaceQuery(self.noise)
class GroupByCountOp:
def __init__(
self,
dataset: st.Dataset,
noise: float,
):
self._dataset = dataset
self.noise = noise
def dataset(self) -> st.Dataset:
return self._dataset
def value(
self,
column_name: t.Optional[str],
max_multiplicity: float,
random_generator: t.Optional[np.random.Generator] = None,
keys_values: t.Optional[t.List[t.Any]] = None,
) -> pd.DataFrame:
dataset = self.dataset()
if dataset.manager().is_big_data(dataset):
return self._sql(
column_name, max_multiplicity, random_generator, keys_values
)
return self._pandas(
column_name, max_multiplicity, random_generator, keys_values
)
def _sql(
self,
column_name: t.Optional[str],
max_multiplicity: float,
random_generator: t.Optional[np.random.Generator] = None,
keys_values: t.Optional[t.List[t.Any]] = None,
) -> pd.DataFrame:
raise NotImplementedError
def _pandas(
self,
column_name: t.Optional[str],
max_multiplicity: float,
random_generator: t.Optional[np.random.Generator] = None,
keys_values: t.Optional[t.List[t.Any]] = None,
) -> pd.DataFrame:
"""
The pandas implementation for the groupby count operation.
We apply a dp implementation of the count to all groups,
using a multi-dimensional Laplace mechanism.
See the docstring of the '_pandas' method in 'GroupBySumOp'
for more information about the noise used for each Laplace
mechanism in each group.
"""
(
groupbydata,
keys_name,
) = compute_groupby_data_from_dataspec(self.dataset())
results = []
for key, group_data in groupbydata:
if keys_values is None or key in keys_values:
count = dataset_length(
group_data,
PU_COLUMN,
PUBLIC,
WEIGHTS,
self.noise,
max_multiplicity,
random_generator,
)
results.append((key, count))
return group_by_results_to_dataframe(results, "Count", keys_name)
def private_query(self) -> PrivateQuery:
return ComposedQuery([LaplaceQuery(self.noise)])
class HistogramOp:
def __init__(
self,
dataset: st.Dataset,
noise: float,
normalize: bool = False,
sort: bool = True,
ascending: bool = False,
bins: t.Optional[int] = None,
dropna: bool = True,
):
self._dataset = dataset
self.noise = noise
self.normalize = normalize
self.sort = sort
self.ascending = ascending
self.bins = bins
self.dropna = dropna
def dataset(self) -> st.Dataset:
return self._dataset
def value(
self,
column_name: str,
max_multiplicity: float,
random_generator: t.Optional[np.random.Generator] = None,
) -> t.Dict[str, float]:
dataset = self.dataset()
if dataset.manager().is_big_data(dataset):
return self._sql(column_name, max_multiplicity, random_generator)
return self._pandas(column_name, max_multiplicity, random_generator)
def _sql(
self,
column_name: str,
max_multiplicity: float,
random_generator: t.Optional[np.random.Generator] = None,
) -> t.Dict[str, float]:
raise NotImplementedError
def _pandas(
self,
column_name: str,
max_multiplicity: float,
random_generator: t.Optional[np.random.Generator] = None,
) -> t.Dict[str, float]:
data = compute_full_data_from_dataspec(self.dataset())
syn_data = t.cast(
st.Dataset,
self.dataset().variant(
kind=st.ConstraintKind.SYNTHETIC,
public_context=[],
),
).to_pandas()
if DATA in syn_data.columns:
categories = pd.DataFrame.from_records(
syn_data[DATA].values.tolist()
)[column_name].unique()
else:
categories = syn_data[column_name].unique()
histogram = private_histogram(
data[[column_name, PU_COLUMN, PUBLIC, WEIGHTS]],
column_name,
PU_COLUMN,
PUBLIC,
WEIGHTS,
self.noise,
max_multiplicity=max_multiplicity,
categories=categories,
random_generator=random_generator,
)
histogram = {key: round(value) for key, value in histogram.items()}
if self.normalize:
norm = sum(histogram.values())
if norm:
histogram = {
key: value / norm for key, value in histogram.items()
}
if self.sort:
if self.ascending:
histogram = {
k: v
for k, v in sorted(
histogram.items(), key=lambda item: item[1]
)
}
else:
histogram = {
k: v
for k, v in sorted(
histogram.items(),
key=lambda item: item[1],
reverse=True,
)
}
return histogram
def private_query(self) -> PrivateQuery:
return LaplaceQuery(self.noise)