Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
sarus_statistics / sarus_statistics / ops / covariance / diffprivlib_cov.py
Size: Mime:
"""
Weighted version of the diffprivlib covariance implementation
See https://github.com/IBM/differential-privacy-library/blob/main/diffprivlib/
models/utils.py
"""

import typing as t
from numbers import Integral

import numpy as np
from diffprivlib.mechanisms import Bingham, LaplaceBoundedDomain
from diffprivlib.utils import check_random_state
from scipy.linalg import null_space

EPS = 1e-8


def covariance_eig(
    array: np.ndarray,
    weights: np.ndarray,
    epsilon: float = 1.0,
    dims: t.Optional[int] = None,
    eigvals_only: bool = False,
    random_state: t.Optional[
        t.Union[int, np.random.mtrand.RandomState]
    ] = None,
) -> t.Tuple[np.ndarray, np.ndarray]:
    r"""
    Return the eigenvalues and eigenvectors of the covariance matrix of
    `array`, satisfying differential privacy.

    Paper link: http://papers.nips.cc/paper/
    9567-differentially-private-covariance-estimation.pdf

    Parameters
    ----------
    array : array-like, shape (n_samples, n_features)
        Matrix for which the covariance matrix is sought.

    epsilon : float, default: 1.0
        Privacy parameter :math:`\epsilon`.

    dims : int, optional
        Number of eigenvectors and eigenvalues to return.
        If `None`, return all eigenvectors.

    eigvals_only : bool, default: False
        Only return the eigenvalue estimates.  If True, all the privacy
        budget is spent on estimating the eigenvalues.

    random_state : int or RandomState, optional
        Controls the randomness of the model.  To obtain a deterministic
        behaviour during randomisation, ``random_state`` has to be fixed to
        an integer.

    Returns
    -------
    w : (dims) array
        The eigenvalues, each repeated according to its multiplicity.

    v : (n_features, dims) array
        The normalized (unit "length") eigenvectors, such that the column
        ``v[:,i]`` is the eigenvector corresponding to the eigenvalue
        ``w[i]``.
    """

    random_state = check_random_state(random_state)

    n_features = array.shape[1]
    dims = n_features if dims is None else min(dims, n_features)
    if not isinstance(dims, Integral):
        raise TypeError(
            "Number of requested dimensions must be integer-valued, "
            f"got {type(dims)}"
        )
    if dims < 0:
        raise ValueError(
            f"Number of requested dimensions must be non-negative, got {dims}"
        )

    max_norm = weighted_norm(array, weights, axis=1).max()
    assert max_norm <= 1 + EPS

    X = array.T
    C = X.dot(X.T)
    # Distributing budget as in the corollary 2 of section 3.1.
    epsilon_0 = epsilon if eigvals_only else epsilon / 2.0
    epsilon_i = epsilon / (2.0 * dims)

    eigvals = np.linalg.eigvalsh(C)[::-1]

    mech_eigvals = LaplaceBoundedDomain(
        epsilon=epsilon_0,
        lower=0,
        upper=float("inf"),
        sensitivity=2,
        random_state=random_state,
    )
    noisy_eigvals = np.array(
        [mech_eigvals.randomise(eigval) for eigval in eigvals]
    )

    if eigvals_only:
        return noisy_eigvals

    # When estimating all eigenvectors, we don't need to spend budget for the
    # dth vector
    cov_i = C.copy()
    proj_i = np.eye(n_features)

    theta = np.zeros((0, n_features))
    mech_cov = Bingham(
        epsilon=epsilon_i, sensitivity=1, random_state=random_state
    )

    for _ in range(dims):
        if cov_i.size > 1:
            u_i = mech_cov.randomise(cov_i)
        else:
            u_i = np.ones((1,))

        theta_i = proj_i.T.dot(u_i)
        theta = np.vstack((theta, theta_i))

        if cov_i.size > 1:
            proj_i = null_space(theta).T
            cov_i = proj_i.dot(C).dot(proj_i.T)

    return noisy_eigvals[:dims], theta.T


def weighted_norm(
    data: np.ndarray, weights: np.ndarray, axis: t.Optional[int] = None
) -> float:
    weights = weights.reshape((len(weights), 1))
    return t.cast(float, np.sqrt(np.sum(weights * data * data, axis=axis)))


def normalize(
    data: np.ndarray, weights: np.ndarray, max_norm: float
) -> np.ndarray:
    norms = weighted_norm(data, weights, axis=1)
    scale = np.minimum(1, max_norm / norms).reshape(
        (len(t.cast(t.Sized, norms)), 1)
    )
    return t.cast(np.ndarray, data * scale)


def scale_clip(
    data: np.ndarray, weights: np.ndarray, max_norm: float
) -> np.ndarray:
    """It scales the data such that the max norm for each row is at most 1.
    Rows with norm greater than max norm are scaled by their norm instead of
    max norm.
    """
    data = np.nan_to_num(data, nan=0.0)
    row_norms = weighted_norm(data, weights, axis=1)
    return t.cast(np.ndarray, data / np.maximum(row_norms, max_norm)[:, None])