Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
"""Local implementation of range detection"""
from __future__ import annotations

from typing import Literal, Optional, Sequence, Tuple, Union

import numpy as np
import pandas as pd

from sarus_statistics.ops.utils import (
    check_is_private,
    generator_from_seed,
    rescale_weights,
)


# pylint: disable=too-many-arguments, too-many-locals, too-many-branches
def automatic_column_range_pandas(
    data: pd.DataFrame,
    data_col: str,
    user_col: str,
    private_col: str,
    weight_col: str,
    dtype: Literal[
        'boolean', 'float32', 'float64', 'int8', 'int16', 'int32', 'int64'
    ],
    noise: float,
    prob_no_false_positive: float = 1 - 1e-9,
    max_multiplicity: float = 1,
    estimate: Tuple[Optional[float], Optional[float]] = (None, None),
    random_generator: Optional[np.random.Generator] = None,
    is_data_already_scaled: bool = False,
) -> Tuple[float, float]:
    """Automatic bounding function from
    https://arxiv.org/pdf/1909.01917.pdf paragraph 5.1.1

    Parameters
    -----------
    data: pd.Dataframe
        column to evaluate
    data_col: str
        name of the column with values to be evaluated
    user_col: str
        name of the user column
    private_col: str
        name of the column indicating the privacy status
    weight_col: str
        name of the weight_col
    dtype:
        type of the data (from sarus_dataset.type)
    noise: float
        scale of the laplace noise
    prob_no_false_positive: float
        probability of not having
        a false positive, should be very close to 1.
    max_multiplicity: float
        maximum weight per user considered.
        values are sampled if it overflows.
    estimate: Tuple[Optional[float], Optional[float]]
        Gross estimate of the bounds.
        If None, use the min or max of the data type
        computed bounds are expected to fall inside the estimate
    random_generator: Optional[np.random.Generator]
        generator to use through the computation (for reproducibility)

    Returns
    -------
    Tuple[float, float]
        minimum and maximum. Either (noisy) closest power of two or estimate

    Raises
    -------
    ValueError
        if prob_no_false_positive not in [0,1]
    TypeError
        if dtype not in float64, float32 or int64 to int8
    """
    random_generator = (
        random_generator
        if random_generator is not None
        else generator_from_seed(None)
    )
    check_is_private(data, user_col, private_col)

    if prob_no_false_positive < 0 or prob_no_false_positive > 1:
        raise ValueError("prob_no_false_positive should be in [0,1]")

    if (
        estimate[0] == estimate[1]
        and estimate[0] is not None
        and estimate[1] is not None
    ):
        return (
            estimate[0],
            estimate[1],
        )

    if dtype == "boolean":
        return (
            0,
            1,
        )
    elif dtype == "float32":

        bitsize = 127 * 2
        bins: Sequence[Union[int, float]] = (
            [-(2 ** (bitsize // 2 - b)) for b in range(bitsize)]
            + [0]
            + [2 ** (b - (bitsize // 2)) for b in range(bitsize)]
        )
    elif (
        dtype == 'float64'
    ):  # if proto_type.float.base == sdty.FloatBase.FLOAT64:
        bitsize = 1023 * 2
        bins = (
            [-(2 ** (bitsize // 2 - b)) for b in range(bitsize)]
            + [0]
            + [2 ** (b - (bitsize // 2)) for b in range(bitsize)]
        )
    elif dtype == "int8":

        bitsize = 8
        bins = [0] + [1 << b for b in range(bitsize)]
    elif dtype == 'int16':
        bitsize = 16
        bins = (
            [-(1 << b) for b in reversed(range(bitsize))]
            + [0]
            + [1 << b for b in range(bitsize)]
        )
    elif dtype == 'int32':
        bitsize = 32
        bins = (
            [-(1 << b) for b in reversed(range(bitsize))]
            + [0]
            + [1 << b for b in range(bitsize)]
        )
    elif dtype == 'int64':
        bitsize = 64
        bins = (
            [-(1 << b) for b in reversed(range(bitsize))]
            + [0]
            + [1 << b for b in range(bitsize)]
        )
    else:
        raise TypeError(
            "dtype not implemented, please convert to float64,"
            " float32 or int64 to int8"
        )

    if estimate[0] is not None:
        bins = [estimate[0]] + [b for b in bins if b > estimate[0]]
    if estimate[1] is not None:
        bins = [b for b in bins if b < estimate[1]] + [estimate[1]]

    # remove data outside estimate
    if estimate[0] and estimate[1]:
        data = data[data[data_col].between(*estimate)]
    elif estimate[0]:
        data = data[data[data_col] >= estimate[0]]
    elif estimate[1]:
        data = data[data[data_col] <= estimate[1]]

    # rescale weights for private rows
    if not is_data_already_scaled:
        data = rescale_weights(
            data=data,
            user_col=user_col,
            private_col=private_col,
            weight_col=weight_col,
            max_multiplicity=max_multiplicity,
        )
    counts, _ = np.histogram(
        data[data_col].values,
        bins=bins,
        weights=data[weight_col].values,
    )

    threshold = (
        -noise
        * max_multiplicity
        * np.log(1 - (prob_no_false_positive ** (1 / (len(bins) - 1))))
    )
    n_counts = len(counts)
    counts = counts + random_generator.laplace(
        loc=0, scale=noise * max_multiplicity, size=n_counts
    )
    try:
        minimum = bins[
            next(x for x, val in enumerate(counts) if val > threshold)
        ]
    except StopIteration:
        minimum = bins[0]

    try:
        maximum = bins[
            n_counts
            - next(
                x for x, val in enumerate(np.flip(counts)) if val > threshold
            )
        ]
    except StopIteration:
        maximum = bins[-1] - 1

    if estimate[0] is not None:
        minimum = max(minimum, estimate[0])
    if estimate[1] is not None:
        maximum = min(maximum, estimate[1])

    return minimum, maximum