Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# pylint:disable=invalid-name

# coding=utf-8
# Copyright 2021 The Google Research Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# From https://github.com/google-research/google-research/tree/master/dp_multiq
# Paper: https://arxiv.org/pdf/2102.08244.pdf

"""JointExp method for computing multiple dp quantiles."""

from typing import Dict, Optional, cast

import numpy as np
from numpy.fft import irfft, rfft
from scipy import special


def racing_sample(
    log_terms: np.ndarray, random_generator: np.random.Generator
) -> np.signedinteger:
    """Numerically stable method for sampling from an exponential distribution.

    Args:
        log_terms: Array of terms of form log(coefficient) - (exponent term).

    Returns:
        A sample from the exponential distribution determined by terms. See
        Algorithm 1 from the paper "Duff: A Dataset-Distance-Based
        Utility Function Family for the Exponential Mechanism"
        (https://arxiv.org/pdf/2010.04235.pdf) for details; each element of
        terms is analogous to a single log(lambda(A_k)) - (eps * k/2) in their
        algorithm.
    """
    return np.argmin(
        np.log(np.log(1.0 / random_generator.uniform(size=log_terms.shape)))
        - log_terms
    )


def compute_intervals(
    sorted_data: np.ndarray, data_low: float, data_high: float
) -> np.ndarray:
    """Returns array of intervals of adjacent points.

    Args:
        sorted_data: Nondecreasing array of data points, all in the [data_low,
            data_high] range.
        data_low: Lower bound for data.
        data_high: Upper bound for data.

    Returns:
        An array of intervals of adjacent points from [data_low, data_high] in
        nondecreasing order. For example, if sorted_data = [0,1,1,2,3],
        data_low = 0, and data_high = 4, returns
        [[0, 0], [0, 1], [1, 1], [1, 2], [2, 3], [3, 4]].
    """
    return cast(
        np.ndarray,
        np.block(
            [
                [data_low, sorted_data],
                [sorted_data, data_high],
            ]
        ).transpose(),
    ).astype(float)


def compute_log_phi(
    data_intervals: np.ndarray,
    weights: np.ndarray,
    qs: np.ndarray,
    eps: float,
    swap: bool,
    max_multiplicity: float,
) -> np.ndarray:
    """Computes two-dimensional array log_phi.

    Args:
        data_intervals: Array of intervals of adjacent points from
            compute_intervals.
        qs: Increasing array of quantiles in [0,1].
        eps: Privacy parameter epsilon.
        swap: If true, uses swap dp sensitivity, otherwise uses add-remove.

    Returns:
        Array log_phi where log_phi[i-i',j] = log(phi(i, i', j)).
    """
    if swap:
        sensitivity = 2.0
    else:
        if len(qs) == 1:
            sensitivity = 2.0 * (1 - cast(float, min(qs[0], 1 - qs[0])))
        else:
            sensitivity = 2.0 * (
                1
                - min(
                    qs[0],
                    np.min(qs[1:] - qs[:-1]),
                    1 - qs[-1],
                )
            )
    eps_term = -(eps / (2.0 * sensitivity * max_multiplicity))
    gaps = np.block([0, np.cumsum(weights)])
    target_ns = (np.block([qs, 1]) - np.block([0, qs])) * np.sum(weights)
    return cast(np.ndarray, eps_term * np.abs(gaps.reshape(-1, 1) - target_ns))


def logdotexp_toeplitz_lt(c: np.ndarray, x: np.ndarray) -> np.ndarray:
    """Multiplies a log-space vector by a lower triangular Toeplitz matrix.

    Args:
        c: First column of the Toeplitz matrix (in log space).
        x: Vector to be multiplied (in log space).

    Returns:
        Let T denote the lower triangular Toeplitz matrix whose first column
        is given by exp(c); then the vector returned by this function is
        log(T * exp(x)). The multiplication is done using FFTs for efficiency,
        and care is taken to avoid overflow during exponentiation.
    """
    max_c, max_x = np.max(c), np.max(x)
    exp_c, exp_x = cast(np.ndarray, c - max_c), cast(np.ndarray, x - max_x)
    np.exp(exp_c, out=exp_c)
    np.exp(exp_x, out=exp_x)
    n = len(x)
    # Choose the next power of two.
    p = np.power(2, np.ceil(np.log2(2 * n - 1))).astype(int)
    fft_exp_c = rfft(exp_c, n=p)
    fft_exp_x = rfft(exp_x, n=p)
    y = cast(
        np.ndarray,
        irfft(fft_exp_c * fft_exp_x)[:n],
    )
    np.maximum(0, y, out=y)
    np.log(y, out=y)
    y += max_c + max_x
    return y


def compute_log_alpha(
    data_intervals: np.ndarray, log_phi: np.ndarray, qs: np.ndarray
) -> np.ndarray:
    """Computes three-dimensional array log_alpha.

    Args:
        data_intervals: Array of intervals of adjacent points from
            compute_intervals.
        log_phi: Array from compute_log_phi.
        qs: Increasing array of quantiles in (0,1).

    Returns:
        Array log_alpha[a, b, c] where a and c index over quantiles and b
        represents interval repeats.
    """
    num_intervals = len(data_intervals)
    num_quantiles = len(qs)
    data_intervals_log_sizes = np.log(
        data_intervals[:, 1] - data_intervals[:, 0]
    )
    log_alpha = np.log(np.zeros([num_quantiles, num_intervals, num_quantiles]))
    log_alpha[0, :, 0] = log_phi[:, 0] + data_intervals_log_sizes
    # A handy mask for log_phi.
    disallow_repeat = np.zeros(num_intervals)
    disallow_repeat[0] = -np.inf
    for j in range(1, num_quantiles):
        log_hat_alpha = special.logsumexp(log_alpha[j - 1, :, :], axis=1)
        log_alpha[j, :, 0] = data_intervals_log_sizes + logdotexp_toeplitz_lt(
            log_phi[:, j], log_hat_alpha
        )
        log_alpha[j, 0, 0] = -np.inf  # Correct possible numerical error.
        log_alpha[j, :, 1 : j + 1] = (
            (log_phi[0, j] + data_intervals_log_sizes)[:, np.newaxis]
            + log_alpha[j - 1, :, 0:j]
            - np.log(np.arange(1, j + 1) + 1)
        )
    return log_alpha


def sample_joint_exp(
    log_alpha: np.ndarray,
    data_intervals: np.ndarray,
    log_phi: np.ndarray,
    qs: np.ndarray,
    random_generator: np.random.Generator,
) -> np.ndarray:
    """Given log_alpha and log_phi, samples final quantile estimates.

    Args:
        log_alpha: Array from compute_log_alpha.
        data_intervals: Array of intervals of adjacent points from
            compute_intervals.
        log_phi: Array from compute_log_phi.
        qs: Increasing array of quantiles in (0,1).

    Returns:
        Array outputs where outputs[i] is the quantile estimate corresponding
        to quantile q[i].
    """
    num_intervals = len(data_intervals)
    num_quantiles = len(qs)
    outputs = np.zeros(num_quantiles)
    last_i = num_intervals - 1
    j = num_quantiles - 1
    repeats = 0
    while j >= 0:
        log_dist = (
            log_alpha[j, : last_i + 1, :]
            + log_phi[: last_i + 1, j + 1][::-1, np.newaxis]
        )
        # Prevent repeats unless it's the first round.
        if j < num_quantiles - 1:
            log_dist[last_i, :] = -np.inf
        # pylint: disable=unbalanced-tuple-unpacking
        i, k = np.unravel_index(
            racing_sample(log_dist, random_generator),
            [last_i + 1, num_quantiles],
        )
        repeats += k
        k += 1
        for j2 in range(j - k + 1, j + 1):
            outputs[j2] = random_generator.uniform(
                data_intervals[i, 0], data_intervals[i, 1]
            )
        j -= k
        last_i = i
    return np.sort(outputs)


# pylint: disable=too-many-arguments
def joint_exp(
    sorted_data: np.ndarray,
    weights: np.ndarray,
    data_low: float,
    data_high: float,
    qs: np.ndarray,
    eps: float,
    swap: bool,
    max_multiplicity: float,
    rho: float = 1e-3,
    random_generator: Optional[np.random.Generator] = None,
) -> np.ndarray:
    """Computes eps-differentially private quantile estimates for qs.

    Args:
        sorted_data: Array of data points sorted in increasing order.
        weights: weights of each data point
        data_low: Lower bound for data.
        data_high: Upper bound for data.
        qs: Increasing array of quantiles in (0,1).
        eps: Privacy parameter epsilon.
        swap: If true, uses swap dp sensitivity, otherwise uses add-remove.
        rho: add small uniform noise to fix degenerate cases with a lot of
            equal values

    Returns:
        Array o where o[i] is the quantile estimate corresponding to quantile
        q[i].
    """
    random_generator = (
        random_generator
        if random_generator is not None
        else np.random.default_rng(random_generator)
    )
    data = np.sort(
        np.clip(
            sorted_data
            + (
                (data_high - data_low)
                * random_generator.uniform(
                    -rho / 2, rho / 2, sorted_data.shape
                )
            ),
            data_low,
            data_high,
        )
    )
    data_intervals = compute_intervals(data, data_low, data_high)
    log_phi = compute_log_phi(
        data_intervals, weights, qs, eps, swap, max_multiplicity
    )
    log_alpha = compute_log_alpha(data_intervals, log_phi, qs)
    return sample_joint_exp(
        log_alpha, data_intervals, log_phi, qs, random_generator
    )


def missed_points_metric(
    sorted_data: np.ndarray, quantile_result: Dict[float, float]
) -> int:
    """Metric for quantiles"""
    quantiles_probs = list(quantile_result.keys())
    quantiles_values = list(quantile_result.values())
    real_quantiles = np.quantile(sorted_data, quantiles_probs)

    return sum(
        [
            cast(
                int,
                np.abs(
                    np.searchsorted(sorted_data, output)
                    - np.searchsorted(sorted_data, dp_output)
                ),
            )
            for output, dp_output in zip(real_quantiles, quantiles_values)
        ]
    )