Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# pylint: disable=too-many-arguments
import operator
from typing import Dict, Optional, Tuple, cast

import numpy as np
import pandas as pd

from sarus_statistics.ops.utils import (
    check_is_private,
    generator_from_seed,
    rescale_weights,
)


def error_volume(
    smaller_quantile: Tuple[float, float], larger_quantile: Tuple[float, float]
) -> float:
    """Computes volume in between two quantiles

    Parameters
    -----------
    smaller_quantile: Tuple[float,float]
        quantile, and its value
    larger_quantile: Tuple[float,float]
        other quantile with its value

    Returns
    -------
    float
        volume inside
    """
    return larger_quantile[0] - smaller_quantile[0]


def max_error_volume(quantiles: Dict[float, float]) -> Tuple[float, float]:
    """Computes volume between each pair of consecutive quantiles
    and returns index where it is maximal

    Parameters
    -----------
    quantiles:Dict[float,float]
        quantile along with the corresponding value in the dataset

    Returns
    --------
    Tuple[float,float]
        the two quantiles whose area in between is the largest
    """
    quantiles_items = sorted(
        quantiles.items(), key=operator.itemgetter(1)
    )  # transforms to list of tuples quantile,value
    error_volumes = {
        (i[0], j[0]): error_volume(smaller_quantile=i, larger_quantile=j)
        for i, j in zip(quantiles_items[:-1], quantiles_items[1:])
    }
    return max(error_volumes.items(), key=operator.itemgetter(1))[0]


def get_probability(
    data: pd.DataFrame,
    value: float,
    noise: float,
    data_col: str,
    weight_col: str,
    max_multiplicity: float,
    random_generator: Optional[np.random.Generator],
) -> float:
    """Given dataset, value and noise, report noisy quantile estimate

    Parameters
    ----------
    data: pd.DataFrame
        dataset values
    value: float
        value to estimate the quantile
    noise: float
        standard deviation of the gaussian noise to add
    data_col: str
        name of the column with values to be evaluated
    weight_col: str
        name of the weight_col
    max_multiplicity: float
        maximum weight per user considered.
        values are sampled if it overflows.
    random_generator: Optional[np.random.Generator]
        generator to use through the computation (for reproducibility)

    Returns
    --------
    float
        probability for the given quantile
    """

    if random_generator is None:
        random_generator = generator_from_seed(None)

    n_samples = data[weight_col].sum()
    lower_bound = (
        (data[data_col] <= value).values * data[weight_col].values
    ).sum()
    noisy_lower_lower_bound: float = lower_bound + random_generator.normal(
        0, scale=noise * max_multiplicity
    )

    upper_bound: float = (
        n_samples
        - lower_bound
        + random_generator.normal(0, scale=noise * max_multiplicity)
    )
    return noisy_lower_lower_bound / (upper_bound + noisy_lower_lower_bound)


def add_quantile(  # pylint:disable=too-many-arguments
    current_quantiles: Dict[float, float],
    new_quantile_value: float,
    new_quantile_probability: float,
    lower_bound: float,
    upper_bound: float,
    iota: float = 0,
) -> Dict[float, float]:
    """Adds couple (new_quantile_probability, new_quantile) to the current
    quantile dictionary. Reorder if needed and clip to iota or 1 - iota in
    order to leave the 0 and 1 quantiles unchanged

    Parameters
    ----------
    current_quantiles: Dict[float, float]
        quantiles before addition
    new_quantile_value, new_quantile_probability: float, float
        pair (probability: value) to add to current_quantiles
    noise: float
        standard deviation of the gaussian noise to add
    lower_bound, upper_bound: float, float
        interval where the new quantile was supposed to fall.
        if new_quantile_probability not in interval, we reorder the quantiles
    iota: float
        clip probabilities to (iota, 1 - iota).
        this is useful to force the 0 and 1 quantiles to be mapped to the
        expected bounds

    Returns
    --------
    Dict[float, float]
        new quantiles
    """
    new_quantile_probability = max(
        iota, min(1 - iota, new_quantile_probability)
    )  # clip to [iota, 1 - iota]
    current_quantiles[new_quantile_probability] = new_quantile_value
    if (
        new_quantile_probability < lower_bound
        or new_quantile_probability > upper_bound
    ):  # reorder if needed
        current_quantiles = {
            i: j
            for i, j in zip(
                sorted(current_quantiles.keys()),
                sorted(current_quantiles.values()),
            )
        }
    return current_quantiles


# pylint: disable=too-many-locals
def feature_quantiles(
    data: pd.DataFrame,
    data_col: str,
    user_col: str,
    private_col: str,
    weight_col: str,
    noise: float,
    sampling_ratio: Optional[float],
    nb_quantiles: int,
    bounds: Tuple[float, float],
    max_multiplicity: float,
    iota: float = 0.0,
    random_generator: Optional[np.random.Generator] = None,
) -> Dict[float, float]:
    """Computes quantiles for a given feature

    Parameters
    -----------
    data: pd.DataFrame
        dataset to compute quantiles
    data_col: str
        name of the column with values to be evaluated
    user_col: str
        name of the user column
    private_col: str
        name of the column indicating the privacy status
    weight_col: str
        name of the weight_col
    noise: float
        noise to add to each query
    sampling_ratio: Optional[float]
        sampling ratio to compute quantiles
    nb_quantiles: int
        nb of quantiles to compute. Note: there can be less quantiles
        returned in practice if two runs return the same quantile value
    bounds:
        bounds on the data distribution
    max_multiplicity: float
        maximum weight per user considered.
        values are sampled if it overflows.
    iota: float
        clip probabilities to (iota, 1 - iota).
        this is useful to force the 0 and 1 quantiles to be mapped to the
        expected bounds
    random_generator: Optional[np.random.Generator]
        generator to use through the computation (for reproducibility)

    Returns
    --------
    Dict[float,float]
        dictionary with the value for each quantile
    """
    check_is_private(data, user_col, private_col)
    quantiles = {0.0: bounds[0], 1.0: bounds[1]}

    if user_col is not None:
        data = rescale_weights(
            data=data,
            user_col=user_col,
            private_col=private_col,
            weight_col=weight_col,
            max_multiplicity=max_multiplicity,
        )

    for _ in range(nb_quantiles):
        current_data = cast(
            pd.DataFrame,
            (
                data.sample(frac=sampling_ratio, replace=False)
                if sampling_ratio is not None
                else data
            ),
        )
        lower_bound, upper_bound = max_error_volume(
            quantiles
        )  # interval where the most probable error is
        new_quantile_value = (
            quantiles[lower_bound] + quantiles[upper_bound]
        ) / 2  # where to compute next quantile
        new_quantile_probability = get_probability(
            data=current_data,
            value=new_quantile_value,
            noise=noise,
            data_col=data_col,
            weight_col=weight_col,
            max_multiplicity=max_multiplicity,
            random_generator=random_generator
            if random_generator is not None
            else generator_from_seed(None),
        )
        quantiles = add_quantile(
            current_quantiles=quantiles,
            new_quantile_value=new_quantile_value,
            new_quantile_probability=new_quantile_probability,
            lower_bound=lower_bound,
            upper_bound=upper_bound,
            iota=iota,
        )
    quantiles = {
        proba: quant
        for proba, quant in sorted(quantiles.items(), key=lambda item: item[0])
    }
    return quantiles