Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import typing as t
from sarus_differential_privacy.query import EpsilonQuery, PrivateQuery
from sarus_statistics.ops.covariance.diffprivlib_cov import scale_clip
from sarus_statistics.ops.covariance.local import covariance
import numpy as np
import pandas as pd
from sarus_data_spec.constants import PUBLIC, PU_COLUMN, WEIGHTS
import sarus_data_spec.typing as st
PERCENTILE = 95
class CovarianceOp:
def __init__(
self,
dataset: st.Dataset,
epsilon: float,
dims: t.Optional[int] = None,
):
self._dataset = dataset
self.epsilon = epsilon
self.dims = dims or None
def dataset(self) -> st.Dataset:
return self._dataset
def value(
self,
max_multiplicity: float,
norm: t.Optional[float] = None,
averages: t.Optional[np.ndarray] = None,
random_generator: t.Optional[np.random.Generator] = None,
) -> pd.DataFrame:
dataset = self.dataset()
if dataset.manager().is_big_data(dataset):
return self._sql(
max_multiplicity, norm, averages, random_generator
)
return self._pandas(max_multiplicity, norm, averages, random_generator)
def _sql(
self,
max_multiplicity: float,
norm: t.Optional[float] = None,
averages: t.Optional[np.ndarray] = None,
random_generator: t.Optional[np.random.Generator] = None,
) -> pd.DataFrame:
raise NotImplementedError
def _pandas(
self,
max_multiplicity: float,
norm: t.Optional[float] = None,
averages: t.Optional[np.ndarray] = None,
random_generator: t.Optional[np.random.Generator] = None,
) -> pd.DataFrame:
if norm is None:
norm = estimate_max_l2_norm(self.dataset())
if averages is None:
averages = estimate_averages(self.dataset())
data_df = t.cast(
pd.DataFrame, self.dataset().to(pd.DataFrame)
).select_dtypes(include=["bool", np.number])
admin_data = self.dataset().to_pandas()[[PU_COLUMN, PUBLIC, WEIGHTS]]
data_df -= averages
data_df = pd.DataFrame(
scale_clip(
data_df.to_numpy(), admin_data[WEIGHTS].to_numpy(), norm
),
columns=data_df.columns,
index=data_df.index,
)
data = data_df.join(admin_data)
return pd.DataFrame(
covariance(
data,
data_df.columns,
PU_COLUMN,
PUBLIC,
WEIGHTS,
max_multiplicity,
self.epsilon,
norm,
self.dims,
random_generator,
),
index=data_df.columns,
columns=data_df.columns,
)
def private_query(self) -> PrivateQuery:
return EpsilonQuery(self.epsilon)
def estimate_covariance(
dataset: st.Dataset,
scales: t.Optional[np.ndarray] = None,
) -> pd.DataFrame:
"""Computes the covariance matrix of the synthetic version of dataset.
nans in columns are replaced with averages. If averages contains nans they
are in turn replaced with zeros.
Args:
dataset (st.Dataset): dataset from which we take the synthetic variant
scales (t.Optional[np.ndarray], optional): If provided, data are scaled
by scale. Defaults to 1.0.
Returns:
pd.DataFrame: pandas data frame with covariance matrix
"""
if scales is None:
scales = np.array(1.0)
synthetic_ds = t.cast(
st.Dataset,
dataset.variant(
kind=st.ConstraintKind.SYNTHETIC,
public_context=[],
),
)
data = (
t.cast(pd.DataFrame, synthetic_ds.to(pd.DataFrame))
.select_dtypes(include=["bool", np.number])
.astype(float)
).dropna() / scales
return data.cov()
def estimate_max_l2_norm(
dataset: st.Dataset,
scales: t.Optional[np.ndarray] = None,
percentile: int = 100,
) -> float:
"""It estimates the max l2 norm of any row of the synthetic variant
of the input dataset. We take the PERCENTILE quantile of the norms of the
synthetic dataset.
"""
if scales is None:
scales = np.array(1.0)
synthetic_ds = t.cast(
st.Dataset,
dataset.variant(
kind=st.ConstraintKind.SYNTHETIC,
public_context=[],
),
)
data = (
t.cast(pd.DataFrame, synthetic_ds.to(pd.DataFrame))
.select_dtypes(include=["bool", np.number])
.astype(float)
).fillna(0.0) / scales
row_norms = np.linalg.norm(data, axis=1)
max_norm = np.percentile(row_norms, percentile)
return t.cast(float, max_norm)
def estimate_averages(
dataset: st.Dataset, scales: t.Optional[np.ndarray] = None
) -> np.ndarray:
if scales is None:
scales = np.array(1.0)
synthetic_ds = t.cast(
st.Dataset,
dataset.variant(
kind=st.ConstraintKind.SYNTHETIC,
public_context=[],
),
)
data = (
t.cast(pd.DataFrame, synthetic_ds.to(pd.DataFrame))
.select_dtypes(include=["bool", np.number])
.astype(float)
).dropna() / scales
averages = data.mean(axis=0)
return t.cast(np.ndarray, averages)