Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
sarus_statistics / sarus_statistics / ops / utils_opendp.py
Size: Mime:
import typing as t
from json import dumps

import numpy as np
from opendp.combinators import make_default_user_transformation
from opendp.transformations import Transformation
from opendp.typing import (
    AllDomain,
    String,
    SymmetricDistance,
    VectorDomain,
    f64,
)


def rescale_weights(
    data: np.ndarray,
    users: np.ndarray,
    weights: np.ndarray,
    is_public: np.ndarray,
    max_multiplicity: float,
) -> np.ndarray:
    """Rescale data weights so each user's private contribution is at max
    max_multiplicity in OpenDP"""
    # Invert the is_public array to work with private entries
    is_private = ~is_public

    # Calculate the sum of weights for each user only for private entries
    _, inverse_indices = np.unique(users[is_private], return_inverse=True)
    user_weights_sum = np.bincount(
        inverse_indices, weights=weights[is_private]
    )

    # Calculate the scaling factors for each user
    scaling_factors = np.minimum(max_multiplicity / user_weights_sum, 1)

    # Apply the scaling factors to the weights array
    weights[is_private] *= scaling_factors[inverse_indices]

    # Use the optimized numpy-based sample_indices_from_weights function
    sampled_indices = sample_indices_from_weights(weights)

    # Return the rescaled data using the sampled indices
    return t.cast(np.ndarray, data[sampled_indices])


def sample_indices_from_weights(weights: np.ndarray) -> np.ndarray:
    """Returns a list of indices with rows duplicated if weight > 1 and
    sampled if weight < 1, in expectation each row is considered by the
    correct amount."""
    integer_part = np.floor(weights).astype(int)
    fractional_part = weights - integer_part
    indices = np.arange(len(weights))
    duplicated_indices = np.repeat(indices, integer_part)
    probabilities = np.random.rand(len(weights))
    sampled_indices = indices[fractional_part > probabilities]
    combined_indices = np.concatenate([duplicated_indices, sampled_indices])
    return t.cast(np.ndarray, combined_indices)


def make_weight_data(
    users: np.ndarray,
    weights: np.ndarray,
    is_public: np.ndarray,
    max_multiplicity: float,
) -> Transformation:
    """Reweight openDP data"""
    stability_map = lambda x: x * max_multiplicity  # noqa: E731
    return make_default_user_transformation(
        lambda x: rescale_weights(
            x, users, weights, is_public, max_multiplicity
        ),
        stability_map=stability_map,
        DI=VectorDomain[AllDomain[f64]],
        DO=VectorDomain[AllDomain[f64]],
        MI=SymmetricDistance,
        MO=SymmetricDistance,
    )


def make_groupby_user(users: t.Iterable) -> Transformation:
    """Groupby users in OpenDP in order to get user-level privacy"""
    users = np.array(users)
    return make_default_user_transformation(
        lambda x: [
            dumps((np.array(x)[users == user]).tolist()) for user in set(users)
        ],
        stability_map=lambda x: 1,
        DI=VectorDomain[AllDomain[f64]],
        DO=VectorDomain[
            AllDomain[String]
        ],  # we can't use Vecs so we serialize
        MI=SymmetricDistance,
        MO=SymmetricDistance,
    )