Repository URL to install this package:
|
Version:
4.0.1 ▾
|
"""Local implementation of range detection"""
from __future__ import annotations
from typing import Literal, Optional, Sequence, Tuple, Union
import numpy as np
import pandas as pd
from sarus_statistics.ops.utils import (
check_is_private,
generator_from_seed,
rescale_weights,
)
# pylint: disable=too-many-arguments, too-many-locals, too-many-branches
def automatic_column_range_pandas(
data: pd.DataFrame,
data_col: str,
user_col: str,
private_col: str,
weight_col: str,
dtype: Literal[
'boolean', 'float32', 'float64', 'int8', 'int16', 'int32', 'int64'
],
noise: float,
prob_no_false_positive: float = 1 - 1e-9,
max_multiplicity: float = 1,
estimate: Tuple[Optional[float], Optional[float]] = (None, None),
random_generator: Optional[np.random.Generator] = None,
is_data_already_scaled: bool = False,
) -> Tuple[float, float]:
"""Automatic bounding function from
https://arxiv.org/pdf/1909.01917.pdf paragraph 5.1.1
Parameters
-----------
data: pd.Dataframe
column to evaluate
data_col: str
name of the column with values to be evaluated
user_col: str
name of the user column
private_col: str
name of the column indicating the privacy status
weight_col: str
name of the weight_col
dtype:
type of the data (from sarus_dataset.type)
noise: float
scale of the laplace noise
prob_no_false_positive: float
probability of not having
a false positive, should be very close to 1.
max_multiplicity: float
maximum weight per user considered.
values are sampled if it overflows.
estimate: Tuple[Optional[float], Optional[float]]
Gross estimate of the bounds.
If None, use the min or max of the data type
computed bounds are expected to fall inside the estimate
random_generator: Optional[np.random.Generator]
generator to use through the computation (for reproducibility)
Returns
-------
Tuple[float, float]
minimum and maximum. Either (noisy) closest power of two or estimate
Raises
-------
ValueError
if prob_no_false_positive not in [0,1]
TypeError
if dtype not in float64, float32 or int64 to int8
"""
random_generator = (
random_generator
if random_generator is not None
else generator_from_seed(None)
)
check_is_private(data, user_col, private_col)
if prob_no_false_positive < 0 or prob_no_false_positive > 1:
raise ValueError("prob_no_false_positive should be in [0,1]")
if (
estimate[0] == estimate[1]
and estimate[0] is not None
and estimate[1] is not None
):
return (
estimate[0],
estimate[1],
)
if dtype == "boolean":
return (
0,
1,
)
elif dtype == "float32":
bitsize = 127 * 2
bins: Sequence[Union[int, float]] = (
[-(2 ** (bitsize // 2 - b)) for b in range(bitsize)]
+ [0]
+ [2 ** (b - (bitsize // 2)) for b in range(bitsize)]
)
elif (
dtype == 'float64'
): # if proto_type.float.base == sdty.FloatBase.FLOAT64:
bitsize = 1023 * 2
bins = (
[-(2 ** (bitsize // 2 - b)) for b in range(bitsize)]
+ [0]
+ [2 ** (b - (bitsize // 2)) for b in range(bitsize)]
)
elif dtype == "int8":
bitsize = 8
bins = [0] + [1 << b for b in range(bitsize)]
elif dtype == 'int16':
bitsize = 16
bins = (
[-(1 << b) for b in reversed(range(bitsize))]
+ [0]
+ [1 << b for b in range(bitsize)]
)
elif dtype == 'int32':
bitsize = 32
bins = (
[-(1 << b) for b in reversed(range(bitsize))]
+ [0]
+ [1 << b for b in range(bitsize)]
)
elif dtype == 'int64':
bitsize = 64
bins = (
[-(1 << b) for b in reversed(range(bitsize))]
+ [0]
+ [1 << b for b in range(bitsize)]
)
else:
raise TypeError(
"dtype not implemented, please convert to float64,"
" float32 or int64 to int8"
)
if estimate[0] is not None:
bins = [estimate[0]] + [b for b in bins if b > estimate[0]]
if estimate[1] is not None:
bins = [b for b in bins if b < estimate[1]] + [estimate[1]]
# remove data outside estimate
if estimate[0] and estimate[1]:
data = data[data[data_col].between(*estimate)]
elif estimate[0]:
data = data[data[data_col] >= estimate[0]]
elif estimate[1]:
data = data[data[data_col] <= estimate[1]]
# rescale weights for private rows
if not is_data_already_scaled:
data = rescale_weights(
data=data,
user_col=user_col,
private_col=private_col,
weight_col=weight_col,
max_multiplicity=max_multiplicity,
)
counts, _ = np.histogram(
data[data_col].values,
bins=bins,
weights=data[weight_col].values,
)
threshold = (
-noise
* max_multiplicity
* np.log(1 - (prob_no_false_positive ** (1 / (len(bins) - 1))))
)
n_counts = len(counts)
counts = counts + random_generator.laplace(
loc=0, scale=noise * max_multiplicity, size=n_counts
)
try:
minimum = bins[
next(x for x, val in enumerate(counts) if val > threshold)
]
except StopIteration:
minimum = bins[0]
try:
maximum = bins[
n_counts
- next(
x for x, val in enumerate(np.flip(counts)) if val > threshold
)
]
except StopIteration:
maximum = bins[-1] - 1
if estimate[0] is not None:
minimum = max(minimum, estimate[0])
if estimate[1] is not None:
maximum = min(maximum, estimate[1])
return minimum, maximum