Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import typing as t

from sarus_differential_privacy.query import LaplaceQuery, PrivateQuery
from sarus_statistics.ops.sum.local import sum_op
from sarus_statistics.ops.utils import group_by_results_to_dataframe

import numpy as np
import pandas as pd

from sarus_data_spec.constants import PUBLIC, PU_COLUMN, WEIGHTS
from sarus_data_spec.sarus_statistics.ops.data_utils import (
    compute_rescaled_data_from_dataspec,
    compute_rescaled_groupby_data_from_dataspec,
)
import sarus_data_spec.typing as st


class SumOp:
    def __init__(
        self,
        dataset: st.Dataset,
        noise: float,
    ):
        self._dataset = dataset
        self.noise = noise

    def dataset(self) -> st.Dataset:
        return self._dataset

    def value(
        self,
        column_name: str,
        max_multiplicity: float,
        bounds: t.Tuple[float, float],
        random_generator: t.Optional[np.random.Generator] = None,
    ) -> float:
        dataset = self.dataset()
        if dataset.manager().is_big_data(dataset):
            return self._sql(
                column_name, max_multiplicity, bounds, random_generator
            )
        return self._pandas(
            column_name, max_multiplicity, bounds, random_generator
        )

    def _sql(
        self,
        column_name: str,
        max_multiplicity: float,
        bounds: t.Tuple[float, float],
        random_generator: t.Optional[np.random.Generator] = None,
    ) -> float:
        raise NotImplementedError

    def _pandas(
        self,
        column_name: str,
        max_multiplicity: float,
        bounds: t.Tuple[float, float],
        random_generator: t.Optional[np.random.Generator] = None,
    ) -> float:
        rescaled_data = compute_rescaled_data_from_dataspec(
            self.dataset(), max_multiplicity
        )
        return sum_op(
            rescaled_data[[column_name, PU_COLUMN, PUBLIC, WEIGHTS]],
            column_name,
            PU_COLUMN,
            PUBLIC,
            WEIGHTS,
            self.noise,
            bounds,
            max_multiplicity,
            random_generator,
            is_data_already_scaled=True,
        )

    def private_query(self) -> PrivateQuery:
        return LaplaceQuery(self.noise)


class GroupBySumOp:
    def __init__(
        self,
        dataset: st.Dataset,
        noise: float,
    ):
        self._dataset = dataset
        self.noise = noise

    def dataset(self) -> st.Dataset:
        return self._dataset

    def value(
        self,
        column_name: str,
        max_multiplicity: float,
        bounds: t.Tuple[float, float],
        random_generator: t.Optional[np.random.Generator] = None,
        keys_values: t.Optional[t.List[t.Any]] = None,
    ) -> pd.DataFrame:
        dataset = self.dataset()
        if dataset.manager().is_big_data(dataset):
            return self._sql(
                column_name,
                max_multiplicity,
                bounds,
                random_generator,
                keys_values,
            )
        return self._pandas(
            column_name,
            max_multiplicity,
            bounds,
            random_generator,
            keys_values,
        )

    def _sql(
        self,
        column_name: str,
        max_multiplicity: float,
        bounds: t.Tuple[float, float],
        random_generator: t.Optional[np.random.Generator] = None,
        keys_values: t.Optional[t.List[t.Any]] = None,
    ) -> pd.DataFrame:
        raise NotImplementedError

    def _pandas(
        self,
        column_name: str,
        max_multiplicity: float,
        bounds: t.Tuple[float, float],
        random_generator: t.Optional[np.random.Generator] = None,
        keys_values: t.Optional[t.List[t.Any]] = None,
    ) -> pd.DataFrame:
        """
        The pandas implementation for the groupby sum operation.

        We apply a dp implementation of the sum to all groups,
        using a multi-dimensional Laplace mechanism.

        To compute the noise to apply for each Laplace mechanism (for each
        group),
        we first need to calculate the L1 sensitivity (s1).

        The minimum noise is given by: 1/epsilon * s1

        With s0, s1, and s_infinite representing the L0, L1, and L_infinite
        sensitivities of the query q, respectively,
        the L1 sensitivity is bounded as follows:
            s1 <= s0 * s_infinite

        - s_infinite is bounded in the local implementation sum_op by
        'sensibility'.
        - s0 is bounded by 'max_multiplicity'.

        Therefore, we apply to each group the noise
                1/epsilon (=self.noise) * max_multiplicity * sensibility:

        - We multiply the noise by 'max_multiplicity' in this part of the code.
        - We multiply by 's_infinite' in 'sum_op'.

        Note: For more information, see:
        https://github.com/google/differential-privacy/
        blob/main/common_docs/Differential_Privacy_Computations_In_Data_Pipelines.pdf
        """
        (
            rescaled_groupbydata,
            keys_name,
        ) = compute_rescaled_groupby_data_from_dataspec(
            self.dataset(), max_multiplicity
        )
        results = []
        for key, rescaled_group_data in rescaled_groupbydata:
            if keys_values is None or key in keys_values:
                dp_sum = sum_op(
                    rescaled_group_data[
                        [column_name, PU_COLUMN, PUBLIC, WEIGHTS]
                    ],
                    column_name,
                    PU_COLUMN,
                    PUBLIC,
                    WEIGHTS,
                    self.noise,
                    bounds,
                    max_multiplicity,
                    random_generator,
                    is_data_already_scaled=True,
                )
                results.append((key, dp_sum))

        return group_by_results_to_dataframe(results, "Sum", keys_name)

    def private_query(self) -> PrivateQuery:
        return LaplaceQuery(self.noise)