Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import typing as t

import numpy as np
import pandas as pd
import sqlalchemy as sa
from sqlalchemy.orm import Session

# importing sarus data spec here create a circular import
VIEW_ENABLED = False  # For now, make assumptions on dataset


def rescale_weights(
    data: pd.DataFrame,
    user_col: str,
    private_col: str,
    weight_col: str,
    max_multiplicity: float,
) -> pd.DataFrame:
    """Rescale data weights so each user's private contribution is at max
    max_multiplicity in L1 norm"""
    # Assuming check_is_private is a function that validates the data

    data_copy = data.copy()
    is_private = ~data_copy[private_col].astype('bool')

    # Compute the sum of weights for each user using vectorized operations
    user_weights_sum = (
        data_copy[is_private]
        .groupby(user_col, dropna=False, sort=False)[weight_col]
        .transform('sum')
    )

    # Calculate the scaling factor for each user
    scaling_factor = (max_multiplicity / user_weights_sum).clip(upper=1)

    # Apply the scaling factor to the weight column
    data_copy.loc[is_private, weight_col] *= scaling_factor

    return data_copy


def rescale_l2_weights(
    data: pd.DataFrame,
    data_col: str,
    user_col: str,
    private_col: str,
    weight_col: str,
    max_multiplicity: float,
) -> pd.DataFrame:
    """Rescale data weights so each user's private contribution is at max
    max_multiplicity in L2 norm"""
    check_is_private(data, user_col, private_col)
    data_copy = data.copy()
    condition = data_copy[private_col]
    user_groupby = data_copy[~condition].groupby(
        [user_col, data_col], dropna=False
    )
    # this sums elements per pe with the same category then put it to squares
    # and returns normal index
    data_summed = (user_groupby[weight_col].sum() ** 2).reset_index()
    user_weights = (
        max_multiplicity
        / np.sqrt(data_summed.groupby(user_col)[weight_col].sum())
    ).clip(upper=1)

    if isinstance(user_weights, pd.DataFrame):
        # if single PE; this is a series do not squeeze
        user_weights = user_weights.squeeze()
    user_weights = user_weights.to_dict()
    data_copy.loc[~condition, weight_col] = data_copy.loc[
        ~condition, weight_col
    ] * (
        data_copy.loc[~condition, user_col]
        .apply(lambda y: user_weights[y])
        .values
    )
    return data_copy


def check_is_private(
    data: pd.DataFrame, user_col: str, private_col: str
) -> None:
    """Check integrity of a dataset for private_col.
    If private_col is True (the row is private), the user must not be None"""
    if len(data[(data[user_col].isnull()) & (~data[private_col])]) > 0:
        raise ValueError(  # is ValueError the right choice ?
            "A row in the dataset is set to private but has "
            "privacy_unit = None"
        )


def generator_from_seed(seed: t.Optional[int] = None) -> np.random.Generator:
    """Build a random generator from a seed"""
    return np.random.default_rng(seed)


# SQL utils
def laplace_mechanism(
    sqlalchemy_expr: sa.sql.ColumnElement,
    noise: float,
) -> sa.sql.ColumnElement:
    """
    transform an sqlalchemy_expr to add Laplace noise of parameter `noise`
    """
    urn = sa.func.random()

    return sqlalchemy_expr - noise * sa.func.log(
        1 - 2 * sa.func.abs(urn - 0.5)
    ) * sa.func.sign(urn - 0.5)


def gaussian_mechanism(
    sqlalchemy_expr: sa.sql.ColumnElement,
    noise: float,
) -> sa.sql.ColumnElement:
    """
    transform an sqlalchemy_expr to add Gaussian noise of parameter `noise`
    """
    urn = sa.func.random()

    return sqlalchemy_expr + noise * sa.func.sqrt(
        -2 * sa.func.log(urn)
    ) * sa.func.cos(2 * sa.func.acos(-1.0) * urn)


def rescale_weights_sql(
    session: Session,
    table: sa.sql.FromClause,
    data_col: str,
    user_col: str,
    private_col: str,
    weight_col: str,
    max_multiplicity: float,
) -> sa.sql.Alias:
    """Rescale data weights so each user's private contribution is at max
    max_multiplicity in L1 norm

    For now, we make the supposition that max_multiplicity is always 1 in big
    data, so no need to clip.
    """
    if not VIEW_ENABLED:
        return t.cast(
            sa.sql.Alias,
            session.query(
                *[col.label(key) for key, col in table.c.items()]
            ).subquery(),
        )

    user_groupby = (
        session.query(
            getattr(table.c, user_col),
            sa.func.sum(getattr(table.c, weight_col)).label(weight_col),
        )
        .where(getattr(table.c, private_col).is_(False))
        .group_by(getattr(table.c, user_col))
        .subquery()
    )
    user_weights = session.query(
        getattr(user_groupby.c, user_col),
        sa.func.least(
            1.0,
            sa.cast(max_multiplicity, sa.Float())
            / getattr(user_groupby.c, weight_col),
        ),
    ).subquery()

    scaled_private_data = (
        session.query(
            getattr(table.c, data_col).label(data_col),
            getattr(table.c, user_col),
            getattr(table.c, private_col),
            getattr(table.c, weight_col) * user_weights.c.least,
        )
        .where(getattr(table.c, private_col).is_(False))
        .join(
            user_weights,
            getattr(table.c, user_col) == getattr(user_weights.c, user_col),
        )
    )
    scaled_private_data = (
        session.query(
            getattr(table.c, data_col).label(data_col),
            getattr(table.c, user_col).label(user_col),
            getattr(table.c, private_col).label(private_col),
            (getattr(table.c, weight_col) * user_weights.c.least).label(
                weight_col
            ),
        )
        .where(getattr(table.c, private_col))
        .union_all(scaled_private_data)
    )

    return t.cast(sa.sql.Alias, scaled_private_data.subquery())


def group_by_results_to_dataframe(
    results: t.List, name_col: str, keys_name: t.Union[t.List[t.Any], t.Any]
) -> pd.DataFrame:
    if len(results) > 0 and isinstance(results[0][0], tuple):
        expanded_results = [(*key, dp_result) for key, dp_result in results]
    else:
        expanded_results = list(results)

    if isinstance(keys_name, list):
        columns = keys_name + [name_col]
    else:
        columns = [keys_name, name_col]
    result_df = pd.DataFrame(expanded_results, columns=columns)
    result_df.set_index(columns[:-1], inplace=True)
    return result_df