Repository URL to install this package:
|
Version:
4.0.1 ▾
|
# pylint: disable=too-many-arguments
import operator
from typing import Dict, Optional, Tuple, cast
import numpy as np
import pandas as pd
from sarus_statistics.ops.utils import (
check_is_private,
generator_from_seed,
rescale_weights,
)
def error_volume(
smaller_quantile: Tuple[float, float], larger_quantile: Tuple[float, float]
) -> float:
"""Computes volume in between two quantiles
Parameters
-----------
smaller_quantile: Tuple[float,float]
quantile, and its value
larger_quantile: Tuple[float,float]
other quantile with its value
Returns
-------
float
volume inside
"""
return larger_quantile[0] - smaller_quantile[0]
def max_error_volume(quantiles: Dict[float, float]) -> Tuple[float, float]:
"""Computes volume between each pair of consecutive quantiles
and returns index where it is maximal
Parameters
-----------
quantiles:Dict[float,float]
quantile along with the corresponding value in the dataset
Returns
--------
Tuple[float,float]
the two quantiles whose area in between is the largest
"""
quantiles_items = sorted(
quantiles.items(), key=operator.itemgetter(1)
) # transforms to list of tuples quantile,value
error_volumes = {
(i[0], j[0]): error_volume(smaller_quantile=i, larger_quantile=j)
for i, j in zip(quantiles_items[:-1], quantiles_items[1:])
}
return max(error_volumes.items(), key=operator.itemgetter(1))[0]
def get_probability(
data: pd.DataFrame,
value: float,
noise: float,
data_col: str,
weight_col: str,
max_multiplicity: float,
random_generator: Optional[np.random.Generator],
) -> float:
"""Given dataset, value and noise, report noisy quantile estimate
Parameters
----------
data: pd.DataFrame
dataset values
value: float
value to estimate the quantile
noise: float
standard deviation of the gaussian noise to add
data_col: str
name of the column with values to be evaluated
weight_col: str
name of the weight_col
max_multiplicity: float
maximum weight per user considered.
values are sampled if it overflows.
random_generator: Optional[np.random.Generator]
generator to use through the computation (for reproducibility)
Returns
--------
float
probability for the given quantile
"""
if random_generator is None:
random_generator = generator_from_seed(None)
n_samples = data[weight_col].sum()
lower_bound = (
(data[data_col] <= value).values * data[weight_col].values
).sum()
noisy_lower_lower_bound: float = lower_bound + random_generator.normal(
0, scale=noise * max_multiplicity
)
upper_bound: float = (
n_samples
- lower_bound
+ random_generator.normal(0, scale=noise * max_multiplicity)
)
return noisy_lower_lower_bound / (upper_bound + noisy_lower_lower_bound)
def add_quantile( # pylint:disable=too-many-arguments
current_quantiles: Dict[float, float],
new_quantile_value: float,
new_quantile_probability: float,
lower_bound: float,
upper_bound: float,
iota: float = 0,
) -> Dict[float, float]:
"""Adds couple (new_quantile_probability, new_quantile) to the current
quantile dictionary. Reorder if needed and clip to iota or 1 - iota in
order to leave the 0 and 1 quantiles unchanged
Parameters
----------
current_quantiles: Dict[float, float]
quantiles before addition
new_quantile_value, new_quantile_probability: float, float
pair (probability: value) to add to current_quantiles
noise: float
standard deviation of the gaussian noise to add
lower_bound, upper_bound: float, float
interval where the new quantile was supposed to fall.
if new_quantile_probability not in interval, we reorder the quantiles
iota: float
clip probabilities to (iota, 1 - iota).
this is useful to force the 0 and 1 quantiles to be mapped to the
expected bounds
Returns
--------
Dict[float, float]
new quantiles
"""
new_quantile_probability = max(
iota, min(1 - iota, new_quantile_probability)
) # clip to [iota, 1 - iota]
current_quantiles[new_quantile_probability] = new_quantile_value
if (
new_quantile_probability < lower_bound
or new_quantile_probability > upper_bound
): # reorder if needed
current_quantiles = {
i: j
for i, j in zip(
sorted(current_quantiles.keys()),
sorted(current_quantiles.values()),
)
}
return current_quantiles
# pylint: disable=too-many-locals
def feature_quantiles(
data: pd.DataFrame,
data_col: str,
user_col: str,
private_col: str,
weight_col: str,
noise: float,
sampling_ratio: Optional[float],
nb_quantiles: int,
bounds: Tuple[float, float],
max_multiplicity: float,
iota: float = 0.0,
random_generator: Optional[np.random.Generator] = None,
) -> Dict[float, float]:
"""Computes quantiles for a given feature
Parameters
-----------
data: pd.DataFrame
dataset to compute quantiles
data_col: str
name of the column with values to be evaluated
user_col: str
name of the user column
private_col: str
name of the column indicating the privacy status
weight_col: str
name of the weight_col
noise: float
noise to add to each query
sampling_ratio: Optional[float]
sampling ratio to compute quantiles
nb_quantiles: int
nb of quantiles to compute. Note: there can be less quantiles
returned in practice if two runs return the same quantile value
bounds:
bounds on the data distribution
max_multiplicity: float
maximum weight per user considered.
values are sampled if it overflows.
iota: float
clip probabilities to (iota, 1 - iota).
this is useful to force the 0 and 1 quantiles to be mapped to the
expected bounds
random_generator: Optional[np.random.Generator]
generator to use through the computation (for reproducibility)
Returns
--------
Dict[float,float]
dictionary with the value for each quantile
"""
check_is_private(data, user_col, private_col)
quantiles = {0.0: bounds[0], 1.0: bounds[1]}
if user_col is not None:
data = rescale_weights(
data=data,
user_col=user_col,
private_col=private_col,
weight_col=weight_col,
max_multiplicity=max_multiplicity,
)
for _ in range(nb_quantiles):
current_data = cast(
pd.DataFrame,
(
data.sample(frac=sampling_ratio, replace=False)
if sampling_ratio is not None
else data
),
)
lower_bound, upper_bound = max_error_volume(
quantiles
) # interval where the most probable error is
new_quantile_value = (
quantiles[lower_bound] + quantiles[upper_bound]
) / 2 # where to compute next quantile
new_quantile_probability = get_probability(
data=current_data,
value=new_quantile_value,
noise=noise,
data_col=data_col,
weight_col=weight_col,
max_multiplicity=max_multiplicity,
random_generator=random_generator
if random_generator is not None
else generator_from_seed(None),
)
quantiles = add_quantile(
current_quantiles=quantiles,
new_quantile_value=new_quantile_value,
new_quantile_probability=new_quantile_probability,
lower_bound=lower_bound,
upper_bound=upper_bound,
iota=iota,
)
quantiles = {
proba: quant
for proba, quant in sorted(quantiles.items(), key=lambda item: item[0])
}
return quantiles