Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
"""Differentially Private Expectation-Maximization as in Park,from Park,
DP-EM: Differentially Private Expectation Maximization,
https://arxiv.org/pdf/1605.06995.pdf"""
from typing import Optional, cast

import numpy as np
from scipy.special import logsumexp
from scipy.stats import multivariate_normal, wishart

from sarus_statistics.ops.utils import generator_from_seed


class DPDGaussianMixture:  # pylint:disable=too-many-instance-attributes
    """Differentially Private Gaussian Mixture based on the paper from Park,
    DP-EM: Differentially Private Expectation Maximization

    Parameters
    ----------
    n_mixtures: int
        number of mixtures to fit
    epsilon: float
        epsilon to consume for each parameter update
    max_determinant: float
        maximal value for determinant of the precision matrix
        in 1d, corresponds to the minimal variance
    max_iter: int
        number of expectation-maximisation iterations
    dimension: int
        dimension of each gaussian
    prior_alpha: Optioanl[int]
        prior on the number of observations
        used for the prior covariance matrix
    prior_beta: Optional[np.ndarray]
        prior on the non normalized covariance
        matrix
    prior_mean: Optional[np.ndarray]
        prior on the mean
    prior_nu: int
        prior on the number of observations
        used to estimate the mean
    """

    def __init__(  # pylint:disable=too-many-arguments,
        self,
        n_mixtures: int,
        epsilon: float,
        dimension: int,
        max_determinant: float = 1e6,
        max_iter: int = 30,
        prior_alpha: Optional[int] = None,
        prior_beta: Optional[np.ndarray] = None,
        prior_nu: int = 0,
        prior_mean: Optional[np.ndarray] = None,
        random_generator: Optional[np.random.Generator] = None,
    ):
        self.random_generator = (
            random_generator
            if random_generator is not None
            else generator_from_seed(None)
        )
        self.n_mixtures = n_mixtures
        self.epsilon = epsilon
        self.max_determinant = max_determinant
        self.max_iter = max_iter
        self.dimension = dimension
        self.prior_alpha = (
            self.dimension + 2 if prior_alpha is None else prior_alpha
        )
        self.prior_beta = (
            self.prior_alpha
            * np.repeat(
                0.25 * np.eye(self.dimension)[np.newaxis, :],
                self.n_mixtures,
                axis=0,
            )
            if prior_beta is None
            else prior_beta
        )

        self.prior_mean = (
            np.zeros(shape=(self.n_mixtures, self.dimension))
            if prior_mean is None
            else prior_mean
        )
        self.prior_nu = prior_nu

    def fit(  # pylint:disable=attribute-defined-outside-init
        self,
        data: np.ndarray,
    ) -> None:
        """Fits a gaussian mixture to the data by applying Expectation-Maximization

        Parameters
        -----------
        data: np.ndarray
            data to fit
        Raises
        -------
        ValueError
            if abs(data)>1

        Returns
        ---------
        PrivateQuery
            protobuf with all private queries
        """

        if np.amax(np.abs(data)) > 1:
            raise ValueError("This method requires L1 bounded data")

        self._initialise_params(
            data=data, random_generator=self.random_generator
        )
        likes = []
        for _ in range(1, self.max_iter + 1):
            responsabilities = self._expectation_step(data)
            self._maximization_step(
                data=data, responsabilities=responsabilities
            )
            likes.append(self.log_likelihood(data))
        self.like = likes

    def _initialise_params(  # pylint:disable=attribute-defined-outside-init
        self, data: np.ndarray, random_generator: np.random.Generator
    ) -> None:
        """Initialise the weights at random, then compute initial means
        and variances for the EM algorithm

        Parameters
        -----------
        data: np.ndarray
            data to fit
        """

        responsabilities = random_generator.random(
            size=(data.shape[0], self.n_mixtures)
        )
        responsabilities /= responsabilities.sum(axis=1)[:, np.newaxis]
        n_k = responsabilities.sum(axis=0)
        self.weights = n_k / data.shape[0]

        means = np.dot(responsabilities.T, data) / n_k[:, np.newaxis]
        non_normalized_covariances = np.matmul(
            responsabilities.T[:, np.newaxis]
            * np.transpose(
                data[np.newaxis]
                - means[
                    :,
                    np.newaxis,
                ],
                [0, 2, 1],
            ),
            data[np.newaxis]
            - means[
                :,
                np.newaxis,
            ],
        )
        self.exponential_mechanism(n_k, means, non_normalized_covariances)

    def _expectation_step(self, data: np.ndarray) -> np.ndarray:
        """Computes responsabilities for each latent variable

        Parameters
        -----------
        data: np.ndarray
            data to fit

        Returns
        -------
        np.ndarray
            array of mixtures weights for each datapoint
        """

        responsabilities = np.array(
            [
                weight
                * multivariate_normal(
                    mean=mean, cov=covariance, allow_singular=True
                ).pdf(data)
                for weight, mean, covariance in zip(
                    self.weights, self.means, self.covariances
                )
            ]
        ).transpose()

        responsabilities = responsabilities / (
            responsabilities.sum(axis=1, keepdims=True)
        )
        return responsabilities

    def log_likelihood(self, data: np.ndarray) -> np.ndarray:
        """Computes loglikelihood of the data with the current
        gaussian mixtures

        Parameters
        -----------
        data:np.ndarray
            data to compute the log likelihood

        Returns
        -------
        np.ndarray
            log likelihood
        """

        single = (
            -self.dimension * np.log(2 * np.pi) / 2
            - 0.5 * np.log(np.linalg.det(self.covariances)).reshape(1, -1)
            - 0.5
            * np.matmul(
                np.transpose(
                    (
                        data[
                            :,
                            np.newaxis,
                        ]
                        - self.means[np.newaxis, :, :]
                    )[:, :, :, np.newaxis],
                    [0, 1, 3, 2],
                ),
                np.matmul(
                    np.linalg.inv(self.covariances),
                    (
                        data[
                            :,
                            np.newaxis,
                        ]
                        - self.means[np.newaxis, :, :]
                    )[:, :, :, np.newaxis],
                ),
            ).squeeze()
            + np.log(self.weights).reshape(1, -1)
        )
        log_like: np.ndarray = np.sum(logsumexp(single, axis=1))
        return log_like

    def _maximization_step(
        self, data: np.ndarray, responsabilities: np.ndarray
    ) -> None:
        """Computes differentially private maximization_step

        Parameters
        -----------
        data: np.ndarray
            data to fit
        responsabilities: np.ndarray
            array of mixtures weights for each datapoint
        """

        n_k = responsabilities.sum(axis=0)
        means = np.dot(responsabilities.T, data) / n_k[:, np.newaxis]

        non_normalized_covariances = np.matmul(
            responsabilities.T[:, np.newaxis]
            * np.transpose(
                data[np.newaxis]
                - means[
                    :,
                    np.newaxis,
                ],
                [0, 2, 1],
            ),
            data[np.newaxis]
            - means[
                :,
                np.newaxis,
            ],
        )

        self.exponential_mechanism(n_k, means, non_normalized_covariances)
        self._update_weights(
            n_k,
            n_samples=data.shape[0],
            random_generator=self.random_generator,
        )

    def _update_weights(  # pylint:disable=attribute-defined-outside-init
        self,
        n_k: np.ndarray,
        n_samples: int,
        random_generator: np.random.Generator,
    ) -> None:
        """Updates weights by adding laplace noise

        Parameters
        ----------
        n_k: np.ndarray
            number of elements for each mixture output by the maximization step
        n_samples: int
            number of samples
        """

        noisy_sum = n_k + random_generator.laplace(
            loc=0,
            scale=1 / self.epsilon,
            size=self.n_mixtures,
        )
        weights = noisy_sum / n_samples
        weights = np.clip(weights, 1e-5, 1)
        self.weights = weights / weights.sum()

    def sample(
        self, n_examples: int, random_generator: np.random.Generator
    ) -> np.ndarray:
        """Samples n_examples from the fitted model

        Parameters
        ----------
        n_examples: int
            number of examples to sample

        Returns
        -------
        np.ndarray
            array of samples
        """
        choices = random_generator.choice(
            [i for i in range(self.n_mixtures)],
            p=self.weights.squeeze() / self.weights.sum(),
            size=n_examples,
            replace=True,
        )
        data = np.stack(
            [
                multivariate_normal(mean=mean, cov=covariance).rvs(
                    size=n_examples
                )
                for mean, covariance in zip(self.means, self.covariances)
            ],
            axis=-1,
        )

        if self.dimension == 1:
            return np.take_along_axis(
                data, choices.reshape(-1, 1), axis=-1
            ).squeeze()

        return np.take_along_axis(
            data, choices.reshape(-1, 1, 1), axis=-1
        ).squeeze()

    def exponential_mechanism(  # pylint:disable=too-many-locals
        self,
        n_k: np.ndarray,
        means: np.ndarray,
        unnorm_covariances: np.ndarray,
    ) -> None:
        """Samples means and covariance matrix for each gaussian via
        the exponential mechanism.

        Parameters
        ----------
        n_k: np.ndarray
            number of elements for each mixture output by the maximization step
        means: np.ndarray
            empirical means
        unnorm_covariances: np.ndarray
            empirical covariances non normalized
            by n_k
        """
        # posterior parameters for the Normal-Gamma
        posterior_beta = (
            self.prior_beta
            + unnorm_covariances
            + (n_k * self.prior_nu / (n_k + self.prior_nu))[
                :, np.newaxis, np.newaxis
            ]
            * np.matmul(
                means[:, :, np.newaxis] - self.prior_mean[:, :, np.newaxis],
                np.transpose(
                    means[:, :, np.newaxis]
                    - self.prior_mean[:, :, np.newaxis],
                    axes=[0, 2, 1],
                ),
            )
        )
        posterior_alpha = self.prior_alpha + n_k
        posterior_mean = (n_k / (n_k + self.prior_nu))[
            :, np.newaxis
        ] * means + (self.prior_nu / (n_k + self.prior_nu))[
            :, np.newaxis
        ] * self.prior_mean
        posterior_nu = n_k + self.prior_nu

        # Compute sensitivity
        sensitivity = np.log(self.max_determinant) / 2

        # Draw covariance matrix from Wishart distribution
        degrees_freedom = self.epsilon / sensitivity / 2 * posterior_alpha + (
            1 - self.epsilon / sensitivity / 2
        ) * (self.dimension)
        scales = 2 * sensitivity / self.epsilon * np.linalg.inv(posterior_beta)
        covariances = np.empty(
            shape=(self.n_mixtures, self.dimension, self.dimension)
        )
        for k in range(self.n_mixtures):
            determinant = self.max_determinant + 1
            covariance: float = 1
            while (determinant > self.max_determinant) or (
                np.any(np.abs(covariance) > 0.25)
            ):
                precision = wishart.rvs(
                    degrees_freedom[k], scale=scales[k], size=1
                )
                if self.dimension == 1:
                    precision = precision[np.newaxis, np.newaxis]
                determinant = np.linalg.det(precision)
                covariance = cast(float, np.linalg.inv(precision))

            covariances[k] = covariance

        means = np.empty(shape=(self.n_mixtures, self.dimension))
        for k in range(self.n_mixtures):
            mean = np.zeros(shape=(self.n_mixtures, self.dimension)) - 1
            while np.any(mean < 0) or np.any(mean > 1):
                mean = multivariate_normal.rvs(
                    mean=posterior_mean[k],
                    cov=2
                    * sensitivity
                    / self.epsilon
                    / posterior_nu[k]
                    * covariances[k],
                )
            means[k] = mean
        self.means = means  # pylint:disable=attribute-defined-outside-init
        self.covariances = (
            covariances  # pylint:disable=attribute-defined-outside-init
        )