Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import typing as t

from sarus_differential_privacy.query import EpsilonQuery, PrivateQuery
from sarus_statistics.ops.covariance.diffprivlib_cov import scale_clip
from sarus_statistics.ops.covariance.local import covariance
import numpy as np
import pandas as pd

from sarus_data_spec.constants import PUBLIC, PU_COLUMN, WEIGHTS
import sarus_data_spec.typing as st

PERCENTILE = 95


class CovarianceOp:
    def __init__(
        self,
        dataset: st.Dataset,
        epsilon: float,
        dims: t.Optional[int] = None,
    ):
        self._dataset = dataset
        self.epsilon = epsilon
        self.dims = dims or None

    def dataset(self) -> st.Dataset:
        return self._dataset

    def value(
        self,
        max_multiplicity: float,
        norm: t.Optional[float] = None,
        averages: t.Optional[np.ndarray] = None,
        random_generator: t.Optional[np.random.Generator] = None,
    ) -> pd.DataFrame:
        dataset = self.dataset()
        if dataset.manager().is_big_data(dataset):
            return self._sql(
                max_multiplicity, norm, averages, random_generator
            )
        return self._pandas(max_multiplicity, norm, averages, random_generator)

    def _sql(
        self,
        max_multiplicity: float,
        norm: t.Optional[float] = None,
        averages: t.Optional[np.ndarray] = None,
        random_generator: t.Optional[np.random.Generator] = None,
    ) -> pd.DataFrame:
        raise NotImplementedError

    def _pandas(
        self,
        max_multiplicity: float,
        norm: t.Optional[float] = None,
        averages: t.Optional[np.ndarray] = None,
        random_generator: t.Optional[np.random.Generator] = None,
    ) -> pd.DataFrame:
        if norm is None:
            norm = estimate_max_l2_norm(self.dataset())

        if averages is None:
            averages = estimate_averages(self.dataset())

        data_df = t.cast(
            pd.DataFrame, self.dataset().to(pd.DataFrame)
        ).select_dtypes(include=["bool", np.number])

        admin_data = self.dataset().to_pandas()[[PU_COLUMN, PUBLIC, WEIGHTS]]

        data_df -= averages

        data_df = pd.DataFrame(
            scale_clip(
                data_df.to_numpy(), admin_data[WEIGHTS].to_numpy(), norm
            ),
            columns=data_df.columns,
            index=data_df.index,
        )
        data = data_df.join(admin_data)

        return pd.DataFrame(
            covariance(
                data,
                data_df.columns,
                PU_COLUMN,
                PUBLIC,
                WEIGHTS,
                max_multiplicity,
                self.epsilon,
                norm,
                self.dims,
                random_generator,
            ),
            index=data_df.columns,
            columns=data_df.columns,
        )

    def private_query(self) -> PrivateQuery:
        return EpsilonQuery(self.epsilon)


def estimate_covariance(
    dataset: st.Dataset,
    scales: t.Optional[np.ndarray] = None,
) -> pd.DataFrame:
    """Computes the covariance matrix of the synthetic version of dataset.
    nans in columns are replaced with averages. If averages contains nans they
    are in turn replaced with zeros.

    Args:
        dataset (st.Dataset): dataset from which we take the synthetic variant
        scales (t.Optional[np.ndarray], optional): If provided, data are scaled
        by scale. Defaults to 1.0.

    Returns:
        pd.DataFrame: pandas data frame with covariance matrix
    """
    if scales is None:
        scales = np.array(1.0)
    synthetic_ds = t.cast(
        st.Dataset,
        dataset.variant(
            kind=st.ConstraintKind.SYNTHETIC,
            public_context=[],
        ),
    )
    data = (
        t.cast(pd.DataFrame, synthetic_ds.to(pd.DataFrame))
        .select_dtypes(include=["bool", np.number])
        .astype(float)
    ).dropna() / scales

    return data.cov()


def estimate_max_l2_norm(
    dataset: st.Dataset,
    scales: t.Optional[np.ndarray] = None,
    percentile: int = 100,
) -> float:
    """It estimates the max l2 norm of any row of the synthetic variant
    of the input dataset. We take the PERCENTILE quantile of the norms of the
    synthetic dataset.
    """
    if scales is None:
        scales = np.array(1.0)
    synthetic_ds = t.cast(
        st.Dataset,
        dataset.variant(
            kind=st.ConstraintKind.SYNTHETIC,
            public_context=[],
        ),
    )
    data = (
        t.cast(pd.DataFrame, synthetic_ds.to(pd.DataFrame))
        .select_dtypes(include=["bool", np.number])
        .astype(float)
    ).fillna(0.0) / scales
    row_norms = np.linalg.norm(data, axis=1)
    max_norm = np.percentile(row_norms, percentile)
    return t.cast(float, max_norm)


def estimate_averages(
    dataset: st.Dataset, scales: t.Optional[np.ndarray] = None
) -> np.ndarray:
    if scales is None:
        scales = np.array(1.0)
    synthetic_ds = t.cast(
        st.Dataset,
        dataset.variant(
            kind=st.ConstraintKind.SYNTHETIC,
            public_context=[],
        ),
    )
    data = (
        t.cast(pd.DataFrame, synthetic_ds.to(pd.DataFrame))
        .select_dtypes(include=["bool", np.number])
        .astype(float)
    ).dropna() / scales
    averages = data.mean(axis=0)
    return t.cast(np.ndarray, averages)