Repository URL to install this package:
|
Version:
4.0.1 ▾
|
"""
Weighted version of the diffprivlib covariance implementation
See https://github.com/IBM/differential-privacy-library/blob/main/diffprivlib/
models/utils.py
"""
import typing as t
from numbers import Integral
import numpy as np
from diffprivlib.mechanisms import Bingham, LaplaceBoundedDomain
from diffprivlib.utils import check_random_state
from scipy.linalg import null_space
EPS = 1e-8
def covariance_eig(
array: np.ndarray,
weights: np.ndarray,
epsilon: float = 1.0,
dims: t.Optional[int] = None,
eigvals_only: bool = False,
random_state: t.Optional[
t.Union[int, np.random.mtrand.RandomState]
] = None,
) -> t.Tuple[np.ndarray, np.ndarray]:
r"""
Return the eigenvalues and eigenvectors of the covariance matrix of
`array`, satisfying differential privacy.
Paper link: http://papers.nips.cc/paper/
9567-differentially-private-covariance-estimation.pdf
Parameters
----------
array : array-like, shape (n_samples, n_features)
Matrix for which the covariance matrix is sought.
epsilon : float, default: 1.0
Privacy parameter :math:`\epsilon`.
dims : int, optional
Number of eigenvectors and eigenvalues to return.
If `None`, return all eigenvectors.
eigvals_only : bool, default: False
Only return the eigenvalue estimates. If True, all the privacy
budget is spent on estimating the eigenvalues.
random_state : int or RandomState, optional
Controls the randomness of the model. To obtain a deterministic
behaviour during randomisation, ``random_state`` has to be fixed to
an integer.
Returns
-------
w : (dims) array
The eigenvalues, each repeated according to its multiplicity.
v : (n_features, dims) array
The normalized (unit "length") eigenvectors, such that the column
``v[:,i]`` is the eigenvector corresponding to the eigenvalue
``w[i]``.
"""
random_state = check_random_state(random_state)
n_features = array.shape[1]
dims = n_features if dims is None else min(dims, n_features)
if not isinstance(dims, Integral):
raise TypeError(
"Number of requested dimensions must be integer-valued, "
f"got {type(dims)}"
)
if dims < 0:
raise ValueError(
f"Number of requested dimensions must be non-negative, got {dims}"
)
max_norm = weighted_norm(array, weights, axis=1).max()
assert max_norm <= 1 + EPS
X = array.T
C = X.dot(X.T)
# Distributing budget as in the corollary 2 of section 3.1.
epsilon_0 = epsilon if eigvals_only else epsilon / 2.0
epsilon_i = epsilon / (2.0 * dims)
eigvals = np.linalg.eigvalsh(C)[::-1]
mech_eigvals = LaplaceBoundedDomain(
epsilon=epsilon_0,
lower=0,
upper=float("inf"),
sensitivity=2,
random_state=random_state,
)
noisy_eigvals = np.array(
[mech_eigvals.randomise(eigval) for eigval in eigvals]
)
if eigvals_only:
return noisy_eigvals
# When estimating all eigenvectors, we don't need to spend budget for the
# dth vector
cov_i = C.copy()
proj_i = np.eye(n_features)
theta = np.zeros((0, n_features))
mech_cov = Bingham(
epsilon=epsilon_i, sensitivity=1, random_state=random_state
)
for _ in range(dims):
if cov_i.size > 1:
u_i = mech_cov.randomise(cov_i)
else:
u_i = np.ones((1,))
theta_i = proj_i.T.dot(u_i)
theta = np.vstack((theta, theta_i))
if cov_i.size > 1:
proj_i = null_space(theta).T
cov_i = proj_i.dot(C).dot(proj_i.T)
return noisy_eigvals[:dims], theta.T
def weighted_norm(
data: np.ndarray, weights: np.ndarray, axis: t.Optional[int] = None
) -> float:
weights = weights.reshape((len(weights), 1))
return t.cast(float, np.sqrt(np.sum(weights * data * data, axis=axis)))
def normalize(
data: np.ndarray, weights: np.ndarray, max_norm: float
) -> np.ndarray:
norms = weighted_norm(data, weights, axis=1)
scale = np.minimum(1, max_norm / norms).reshape(
(len(t.cast(t.Sized, norms)), 1)
)
return t.cast(np.ndarray, data * scale)
def scale_clip(
data: np.ndarray, weights: np.ndarray, max_norm: float
) -> np.ndarray:
"""It scales the data such that the max norm for each row is at most 1.
Rows with norm greater than max norm are scaled by their norm instead of
max norm.
"""
data = np.nan_to_num(data, nan=0.0)
row_norms = weighted_norm(data, weights, axis=1)
return t.cast(np.ndarray, data / np.maximum(row_norms, max_norm)[:, None])