Repository URL to install this package:
|
Version:
4.0.1 ▾
|
"""Differentially Private Expectation-Maximization as in Park,from Park,
DP-EM: Differentially Private Expectation Maximization,
https://arxiv.org/pdf/1605.06995.pdf"""
from typing import Optional, cast
import numpy as np
from scipy.special import logsumexp
from scipy.stats import multivariate_normal, wishart
from sarus_statistics.ops.utils import generator_from_seed
class DPDGaussianMixture: # pylint:disable=too-many-instance-attributes
"""Differentially Private Gaussian Mixture based on the paper from Park,
DP-EM: Differentially Private Expectation Maximization
Parameters
----------
n_mixtures: int
number of mixtures to fit
epsilon: float
epsilon to consume for each parameter update
max_determinant: float
maximal value for determinant of the precision matrix
in 1d, corresponds to the minimal variance
max_iter: int
number of expectation-maximisation iterations
dimension: int
dimension of each gaussian
prior_alpha: Optioanl[int]
prior on the number of observations
used for the prior covariance matrix
prior_beta: Optional[np.ndarray]
prior on the non normalized covariance
matrix
prior_mean: Optional[np.ndarray]
prior on the mean
prior_nu: int
prior on the number of observations
used to estimate the mean
"""
def __init__( # pylint:disable=too-many-arguments,
self,
n_mixtures: int,
epsilon: float,
dimension: int,
max_determinant: float = 1e6,
max_iter: int = 30,
prior_alpha: Optional[int] = None,
prior_beta: Optional[np.ndarray] = None,
prior_nu: int = 0,
prior_mean: Optional[np.ndarray] = None,
random_generator: Optional[np.random.Generator] = None,
):
self.random_generator = (
random_generator
if random_generator is not None
else generator_from_seed(None)
)
self.n_mixtures = n_mixtures
self.epsilon = epsilon
self.max_determinant = max_determinant
self.max_iter = max_iter
self.dimension = dimension
self.prior_alpha = (
self.dimension + 2 if prior_alpha is None else prior_alpha
)
self.prior_beta = (
self.prior_alpha
* np.repeat(
0.25 * np.eye(self.dimension)[np.newaxis, :],
self.n_mixtures,
axis=0,
)
if prior_beta is None
else prior_beta
)
self.prior_mean = (
np.zeros(shape=(self.n_mixtures, self.dimension))
if prior_mean is None
else prior_mean
)
self.prior_nu = prior_nu
def fit( # pylint:disable=attribute-defined-outside-init
self,
data: np.ndarray,
) -> None:
"""Fits a gaussian mixture to the data by applying Expectation-Maximization
Parameters
-----------
data: np.ndarray
data to fit
Raises
-------
ValueError
if abs(data)>1
Returns
---------
PrivateQuery
protobuf with all private queries
"""
if np.amax(np.abs(data)) > 1:
raise ValueError("This method requires L1 bounded data")
self._initialise_params(
data=data, random_generator=self.random_generator
)
likes = []
for _ in range(1, self.max_iter + 1):
responsabilities = self._expectation_step(data)
self._maximization_step(
data=data, responsabilities=responsabilities
)
likes.append(self.log_likelihood(data))
self.like = likes
def _initialise_params( # pylint:disable=attribute-defined-outside-init
self, data: np.ndarray, random_generator: np.random.Generator
) -> None:
"""Initialise the weights at random, then compute initial means
and variances for the EM algorithm
Parameters
-----------
data: np.ndarray
data to fit
"""
responsabilities = random_generator.random(
size=(data.shape[0], self.n_mixtures)
)
responsabilities /= responsabilities.sum(axis=1)[:, np.newaxis]
n_k = responsabilities.sum(axis=0)
self.weights = n_k / data.shape[0]
means = np.dot(responsabilities.T, data) / n_k[:, np.newaxis]
non_normalized_covariances = np.matmul(
responsabilities.T[:, np.newaxis]
* np.transpose(
data[np.newaxis]
- means[
:,
np.newaxis,
],
[0, 2, 1],
),
data[np.newaxis]
- means[
:,
np.newaxis,
],
)
self.exponential_mechanism(n_k, means, non_normalized_covariances)
def _expectation_step(self, data: np.ndarray) -> np.ndarray:
"""Computes responsabilities for each latent variable
Parameters
-----------
data: np.ndarray
data to fit
Returns
-------
np.ndarray
array of mixtures weights for each datapoint
"""
responsabilities = np.array(
[
weight
* multivariate_normal(
mean=mean, cov=covariance, allow_singular=True
).pdf(data)
for weight, mean, covariance in zip(
self.weights, self.means, self.covariances
)
]
).transpose()
responsabilities = responsabilities / (
responsabilities.sum(axis=1, keepdims=True)
)
return responsabilities
def log_likelihood(self, data: np.ndarray) -> np.ndarray:
"""Computes loglikelihood of the data with the current
gaussian mixtures
Parameters
-----------
data:np.ndarray
data to compute the log likelihood
Returns
-------
np.ndarray
log likelihood
"""
single = (
-self.dimension * np.log(2 * np.pi) / 2
- 0.5 * np.log(np.linalg.det(self.covariances)).reshape(1, -1)
- 0.5
* np.matmul(
np.transpose(
(
data[
:,
np.newaxis,
]
- self.means[np.newaxis, :, :]
)[:, :, :, np.newaxis],
[0, 1, 3, 2],
),
np.matmul(
np.linalg.inv(self.covariances),
(
data[
:,
np.newaxis,
]
- self.means[np.newaxis, :, :]
)[:, :, :, np.newaxis],
),
).squeeze()
+ np.log(self.weights).reshape(1, -1)
)
log_like: np.ndarray = np.sum(logsumexp(single, axis=1))
return log_like
def _maximization_step(
self, data: np.ndarray, responsabilities: np.ndarray
) -> None:
"""Computes differentially private maximization_step
Parameters
-----------
data: np.ndarray
data to fit
responsabilities: np.ndarray
array of mixtures weights for each datapoint
"""
n_k = responsabilities.sum(axis=0)
means = np.dot(responsabilities.T, data) / n_k[:, np.newaxis]
non_normalized_covariances = np.matmul(
responsabilities.T[:, np.newaxis]
* np.transpose(
data[np.newaxis]
- means[
:,
np.newaxis,
],
[0, 2, 1],
),
data[np.newaxis]
- means[
:,
np.newaxis,
],
)
self.exponential_mechanism(n_k, means, non_normalized_covariances)
self._update_weights(
n_k,
n_samples=data.shape[0],
random_generator=self.random_generator,
)
def _update_weights( # pylint:disable=attribute-defined-outside-init
self,
n_k: np.ndarray,
n_samples: int,
random_generator: np.random.Generator,
) -> None:
"""Updates weights by adding laplace noise
Parameters
----------
n_k: np.ndarray
number of elements for each mixture output by the maximization step
n_samples: int
number of samples
"""
noisy_sum = n_k + random_generator.laplace(
loc=0,
scale=1 / self.epsilon,
size=self.n_mixtures,
)
weights = noisy_sum / n_samples
weights = np.clip(weights, 1e-5, 1)
self.weights = weights / weights.sum()
def sample(
self, n_examples: int, random_generator: np.random.Generator
) -> np.ndarray:
"""Samples n_examples from the fitted model
Parameters
----------
n_examples: int
number of examples to sample
Returns
-------
np.ndarray
array of samples
"""
choices = random_generator.choice(
[i for i in range(self.n_mixtures)],
p=self.weights.squeeze() / self.weights.sum(),
size=n_examples,
replace=True,
)
data = np.stack(
[
multivariate_normal(mean=mean, cov=covariance).rvs(
size=n_examples
)
for mean, covariance in zip(self.means, self.covariances)
],
axis=-1,
)
if self.dimension == 1:
return np.take_along_axis(
data, choices.reshape(-1, 1), axis=-1
).squeeze()
return np.take_along_axis(
data, choices.reshape(-1, 1, 1), axis=-1
).squeeze()
def exponential_mechanism( # pylint:disable=too-many-locals
self,
n_k: np.ndarray,
means: np.ndarray,
unnorm_covariances: np.ndarray,
) -> None:
"""Samples means and covariance matrix for each gaussian via
the exponential mechanism.
Parameters
----------
n_k: np.ndarray
number of elements for each mixture output by the maximization step
means: np.ndarray
empirical means
unnorm_covariances: np.ndarray
empirical covariances non normalized
by n_k
"""
# posterior parameters for the Normal-Gamma
posterior_beta = (
self.prior_beta
+ unnorm_covariances
+ (n_k * self.prior_nu / (n_k + self.prior_nu))[
:, np.newaxis, np.newaxis
]
* np.matmul(
means[:, :, np.newaxis] - self.prior_mean[:, :, np.newaxis],
np.transpose(
means[:, :, np.newaxis]
- self.prior_mean[:, :, np.newaxis],
axes=[0, 2, 1],
),
)
)
posterior_alpha = self.prior_alpha + n_k
posterior_mean = (n_k / (n_k + self.prior_nu))[
:, np.newaxis
] * means + (self.prior_nu / (n_k + self.prior_nu))[
:, np.newaxis
] * self.prior_mean
posterior_nu = n_k + self.prior_nu
# Compute sensitivity
sensitivity = np.log(self.max_determinant) / 2
# Draw covariance matrix from Wishart distribution
degrees_freedom = self.epsilon / sensitivity / 2 * posterior_alpha + (
1 - self.epsilon / sensitivity / 2
) * (self.dimension)
scales = 2 * sensitivity / self.epsilon * np.linalg.inv(posterior_beta)
covariances = np.empty(
shape=(self.n_mixtures, self.dimension, self.dimension)
)
for k in range(self.n_mixtures):
determinant = self.max_determinant + 1
covariance: float = 1
while (determinant > self.max_determinant) or (
np.any(np.abs(covariance) > 0.25)
):
precision = wishart.rvs(
degrees_freedom[k], scale=scales[k], size=1
)
if self.dimension == 1:
precision = precision[np.newaxis, np.newaxis]
determinant = np.linalg.det(precision)
covariance = cast(float, np.linalg.inv(precision))
covariances[k] = covariance
means = np.empty(shape=(self.n_mixtures, self.dimension))
for k in range(self.n_mixtures):
mean = np.zeros(shape=(self.n_mixtures, self.dimension)) - 1
while np.any(mean < 0) or np.any(mean > 1):
mean = multivariate_normal.rvs(
mean=posterior_mean[k],
cov=2
* sensitivity
/ self.epsilon
/ posterior_nu[k]
* covariances[k],
)
means[k] = mean
self.means = means # pylint:disable=attribute-defined-outside-init
self.covariances = (
covariances # pylint:disable=attribute-defined-outside-init
)