Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import typing as t

from sarus_differential_privacy.query import EpsilonQuery, PrivateQuery
from sarus_statistics.ops.median.opendp import median_op
from sarus_statistics.ops.utils import group_by_results_to_dataframe
import numpy as np
import pandas as pd

from sarus_data_spec.constants import PUBLIC, PU_COLUMN, WEIGHTS
from sarus_data_spec.sarus_statistics.ops.data_utils import (
    compute_rescaled_data_from_dataspec,
    compute_rescaled_groupby_data_from_dataspec,
)
import sarus_data_spec.typing as st


class MedianOp:
    def __init__(
        self,
        dataset: st.Dataset,
        noise: float,
    ):
        self._dataset = dataset
        self.noise = noise

    def dataset(self) -> st.Dataset:
        return self._dataset

    def value(
        self,
        column_name: str,
        max_multiplicity: float,
        bounds: t.Tuple[float, float],
        random_generator: t.Optional[np.random.Generator] = None,
    ) -> float:
        dataset = self.dataset()
        if dataset.manager().is_big_data(dataset):
            return self._sql(
                column_name, max_multiplicity, bounds, random_generator
            )
        return self._pandas(
            column_name, max_multiplicity, bounds, random_generator
        )

    def _sql(
        self,
        column_name: str,
        max_multiplicity: float,
        bounds: t.Tuple[float, float],
        random_generator: t.Optional[np.random.Generator] = None,
    ) -> float:
        raise NotImplementedError

    def _pandas(
        self,
        column_name: str,
        max_multiplicity: float,
        bounds: t.Tuple[float, float],
        random_generator: t.Optional[np.random.Generator] = None,
        data: t.Optional[t.Any] = None,
    ) -> float:
        rescaled_data = compute_rescaled_data_from_dataspec(
            self.dataset(), max_multiplicity
        )
        return median_op(
            rescaled_data[[column_name, PU_COLUMN, PUBLIC, WEIGHTS]],
            column_name,
            PU_COLUMN,
            PUBLIC,
            WEIGHTS,
            self.noise,
            bounds,
            max_multiplicity,
            is_data_already_scaled=True,
        )

    def private_query(self) -> PrivateQuery:
        return EpsilonQuery(1 / self.noise)


class GroupbyMedianOp:
    def __init__(
        self,
        dataset: st.Dataset,
        noise: float,
    ):
        self._dataset = dataset
        self.noise = noise

    def dataset(self) -> st.Dataset:
        return self._dataset

    def value(
        self,
        column_name: str,
        max_multiplicity: float,
        bounds: t.Tuple[float, float],
        random_generator: t.Optional[np.random.Generator] = None,
        keys_values: t.Optional[t.List[t.Any]] = None,
    ) -> pd.DataFrame:
        dataset = self.dataset()
        if dataset.manager().is_big_data(dataset):
            return self._sql(
                column_name,
                max_multiplicity,
                bounds,
                random_generator,
                keys_values,
            )
        return self._pandas(
            column_name,
            max_multiplicity,
            bounds,
            random_generator,
            keys_values,
        )

    def _sql(
        self,
        column_name: str,
        max_multiplicity: float,
        bounds: t.Tuple[float, float],
        random_generator: t.Optional[np.random.Generator] = None,
        keys_values: t.Optional[t.List[t.Any]] = None,
    ) -> pd.DataFrame:
        raise NotImplementedError

    def _pandas(
        self,
        column_name: str,
        max_multiplicity: float,
        bounds: t.Tuple[float, float],
        random_generator: t.Optional[np.random.Generator] = None,
        keys_values: t.Optional[t.List[t.Any]] = None,
    ) -> pd.DataFrame:
        """
        The pandas implementation for the groupby median operation.

        We apply a dp implementation of the median to all groups,
        using a multi-dimensional Laplace mechanism.

        See the docstring of the '_pandas' method in 'GroupBySumOp'
        for more information about the noise used for each Laplace
        mechanism in each group.
        """
        (
            rescaled_groupbydata,
            keys_name,
        ) = compute_rescaled_groupby_data_from_dataspec(
            self.dataset(), max_multiplicity
        )
        results = []
        for key, rescaled_group_data in rescaled_groupbydata:
            if keys_values is None or key in keys_values:
                dp_median = median_op(
                    rescaled_group_data[
                        [column_name, PU_COLUMN, PUBLIC, WEIGHTS]
                    ],
                    column_name,
                    PU_COLUMN,
                    PUBLIC,
                    WEIGHTS,
                    self.noise,
                    bounds,
                    max_multiplicity,
                    is_data_already_scaled=True,
                )
                results.append((key, dp_median))

        return group_by_results_to_dataframe(results, "Median", keys_name)

    def private_query(self) -> PrivateQuery:
        return EpsilonQuery(1 / self.noise)