Repository URL to install this package:
|
Version:
4.0.1 ▾
|
import typing as t
from json import dumps
import numpy as np
from opendp.combinators import make_default_user_transformation
from opendp.transformations import Transformation
from opendp.typing import (
AllDomain,
String,
SymmetricDistance,
VectorDomain,
f64,
)
def rescale_weights(
data: np.ndarray,
users: np.ndarray,
weights: np.ndarray,
is_public: np.ndarray,
max_multiplicity: float,
) -> np.ndarray:
"""Rescale data weights so each user's private contribution is at max
max_multiplicity in OpenDP"""
# Invert the is_public array to work with private entries
is_private = ~is_public
# Calculate the sum of weights for each user only for private entries
_, inverse_indices = np.unique(users[is_private], return_inverse=True)
user_weights_sum = np.bincount(
inverse_indices, weights=weights[is_private]
)
# Calculate the scaling factors for each user
scaling_factors = np.minimum(max_multiplicity / user_weights_sum, 1)
# Apply the scaling factors to the weights array
weights[is_private] *= scaling_factors[inverse_indices]
# Use the optimized numpy-based sample_indices_from_weights function
sampled_indices = sample_indices_from_weights(weights)
# Return the rescaled data using the sampled indices
return t.cast(np.ndarray, data[sampled_indices])
def sample_indices_from_weights(weights: np.ndarray) -> np.ndarray:
"""Returns a list of indices with rows duplicated if weight > 1 and
sampled if weight < 1, in expectation each row is considered by the
correct amount."""
integer_part = np.floor(weights).astype(int)
fractional_part = weights - integer_part
indices = np.arange(len(weights))
duplicated_indices = np.repeat(indices, integer_part)
probabilities = np.random.rand(len(weights))
sampled_indices = indices[fractional_part > probabilities]
combined_indices = np.concatenate([duplicated_indices, sampled_indices])
return t.cast(np.ndarray, combined_indices)
def make_weight_data(
users: np.ndarray,
weights: np.ndarray,
is_public: np.ndarray,
max_multiplicity: float,
) -> Transformation:
"""Reweight openDP data"""
stability_map = lambda x: x * max_multiplicity # noqa: E731
return make_default_user_transformation(
lambda x: rescale_weights(
x, users, weights, is_public, max_multiplicity
),
stability_map=stability_map,
DI=VectorDomain[AllDomain[f64]],
DO=VectorDomain[AllDomain[f64]],
MI=SymmetricDistance,
MO=SymmetricDistance,
)
def make_groupby_user(users: t.Iterable) -> Transformation:
"""Groupby users in OpenDP in order to get user-level privacy"""
users = np.array(users)
return make_default_user_transformation(
lambda x: [
dumps((np.array(x)[users == user]).tolist()) for user in set(users)
],
stability_map=lambda x: 1,
DI=VectorDomain[AllDomain[f64]],
DO=VectorDomain[
AllDomain[String]
], # we can't use Vecs so we serialize
MI=SymmetricDistance,
MO=SymmetricDistance,
)