Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import typing as t

from sarus_differential_privacy.query import (
    ComposedQuery,
    LaplaceQuery,
    PrivateQuery,
)
from sarus_statistics.ops.histograms.local import (
    dataset_length,
    private_histogram,
)
from sarus_statistics.ops.utils import group_by_results_to_dataframe
import numpy as np
import pandas as pd

from sarus_data_spec.constants import DATA, PUBLIC, PU_COLUMN, WEIGHTS
from sarus_data_spec.sarus_statistics.ops.data_utils import (
    compute_full_data_from_dataspec,
    compute_groupby_data_from_dataspec,
)
import sarus_data_spec.typing as st


class CountOp:
    def __init__(
        self,
        dataset: st.Dataset,
        noise: float,
    ):
        self._dataset = dataset
        self.noise = noise

    def dataset(self) -> st.Dataset:
        return self._dataset

    def value(
        self,
        column_name: t.Optional[str],
        max_multiplicity: float,
        random_generator: t.Optional[np.random.Generator] = None,
    ) -> float:
        dataset = self.dataset()
        if dataset.manager().is_big_data(dataset):
            return self._sql(column_name, max_multiplicity, random_generator)
        return self._pandas(column_name, max_multiplicity, random_generator)

    def _sql(
        self,
        column_name: t.Optional[str],
        max_multiplicity: float,
        random_generator: t.Optional[np.random.Generator] = None,
    ) -> float:
        raise NotImplementedError

    def _pandas(
        self,
        column_name: t.Optional[str],
        max_multiplicity: float,
        random_generator: t.Optional[np.random.Generator] = None,
    ) -> float:
        if not column_name:
            data = t.cast(pd.DataFrame, self.dataset().to_pandas())[
                [PU_COLUMN, PUBLIC, WEIGHTS]
            ]
        else:
            data = compute_full_data_from_dataspec(self.dataset())
            data = data[[column_name, PU_COLUMN, PUBLIC, WEIGHTS]].dropna(
                subset=[column_name]
            )

        return dataset_length(
            data,
            PU_COLUMN,
            PUBLIC,
            WEIGHTS,
            self.noise,
            max_multiplicity,
            random_generator,
        )

    def private_query(self) -> PrivateQuery:
        return LaplaceQuery(self.noise)


class GroupByCountOp:
    def __init__(
        self,
        dataset: st.Dataset,
        noise: float,
    ):
        self._dataset = dataset
        self.noise = noise

    def dataset(self) -> st.Dataset:
        return self._dataset

    def value(
        self,
        column_name: t.Optional[str],
        max_multiplicity: float,
        random_generator: t.Optional[np.random.Generator] = None,
        keys_values: t.Optional[t.List[t.Any]] = None,
    ) -> pd.DataFrame:
        dataset = self.dataset()
        if dataset.manager().is_big_data(dataset):
            return self._sql(
                column_name, max_multiplicity, random_generator, keys_values
            )
        return self._pandas(
            column_name, max_multiplicity, random_generator, keys_values
        )

    def _sql(
        self,
        column_name: t.Optional[str],
        max_multiplicity: float,
        random_generator: t.Optional[np.random.Generator] = None,
        keys_values: t.Optional[t.List[t.Any]] = None,
    ) -> pd.DataFrame:
        raise NotImplementedError

    def _pandas(
        self,
        column_name: t.Optional[str],
        max_multiplicity: float,
        random_generator: t.Optional[np.random.Generator] = None,
        keys_values: t.Optional[t.List[t.Any]] = None,
    ) -> pd.DataFrame:
        """
        The pandas implementation for the groupby count operation.

        We apply a dp implementation of the count to all groups,
        using a multi-dimensional Laplace mechanism.

        See the docstring of the '_pandas' method in 'GroupBySumOp'
        for more information about the noise used for each Laplace
        mechanism in each group.
        """
        (
            groupbydata,
            keys_name,
        ) = compute_groupby_data_from_dataspec(self.dataset())
        results = []

        for key, group_data in groupbydata:
            if keys_values is None or key in keys_values:
                count = dataset_length(
                    group_data,
                    PU_COLUMN,
                    PUBLIC,
                    WEIGHTS,
                    self.noise,
                    max_multiplicity,
                    random_generator,
                )
                results.append((key, count))

        return group_by_results_to_dataframe(results, "Count", keys_name)

    def private_query(self) -> PrivateQuery:
        return ComposedQuery([LaplaceQuery(self.noise)])


class HistogramOp:
    def __init__(
        self,
        dataset: st.Dataset,
        noise: float,
        normalize: bool = False,
        sort: bool = True,
        ascending: bool = False,
        bins: t.Optional[int] = None,
        dropna: bool = True,
    ):
        self._dataset = dataset
        self.noise = noise
        self.normalize = normalize
        self.sort = sort
        self.ascending = ascending
        self.bins = bins
        self.dropna = dropna

    def dataset(self) -> st.Dataset:
        return self._dataset

    def value(
        self,
        column_name: str,
        max_multiplicity: float,
        random_generator: t.Optional[np.random.Generator] = None,
    ) -> t.Dict[str, float]:
        dataset = self.dataset()
        if dataset.manager().is_big_data(dataset):
            return self._sql(column_name, max_multiplicity, random_generator)
        return self._pandas(column_name, max_multiplicity, random_generator)

    def _sql(
        self,
        column_name: str,
        max_multiplicity: float,
        random_generator: t.Optional[np.random.Generator] = None,
    ) -> t.Dict[str, float]:
        raise NotImplementedError

    def _pandas(
        self,
        column_name: str,
        max_multiplicity: float,
        random_generator: t.Optional[np.random.Generator] = None,
    ) -> t.Dict[str, float]:
        data = compute_full_data_from_dataspec(self.dataset())

        syn_data = t.cast(
            st.Dataset,
            self.dataset().variant(
                kind=st.ConstraintKind.SYNTHETIC,
                public_context=[],
            ),
        ).to_pandas()

        if DATA in syn_data.columns:
            categories = pd.DataFrame.from_records(
                syn_data[DATA].values.tolist()
            )[column_name].unique()
        else:
            categories = syn_data[column_name].unique()

        histogram = private_histogram(
            data[[column_name, PU_COLUMN, PUBLIC, WEIGHTS]],
            column_name,
            PU_COLUMN,
            PUBLIC,
            WEIGHTS,
            self.noise,
            max_multiplicity=max_multiplicity,
            categories=categories,
            random_generator=random_generator,
        )
        histogram = {key: round(value) for key, value in histogram.items()}

        if self.normalize:
            norm = sum(histogram.values())
            if norm:
                histogram = {
                    key: value / norm for key, value in histogram.items()
                }

        if self.sort:
            if self.ascending:
                histogram = {
                    k: v
                    for k, v in sorted(
                        histogram.items(), key=lambda item: item[1]
                    )
                }
            else:
                histogram = {
                    k: v
                    for k, v in sorted(
                        histogram.items(),
                        key=lambda item: item[1],
                        reverse=True,
                    )
                }

        return histogram

    def private_query(self) -> PrivateQuery:
        return LaplaceQuery(self.noise)