Repository URL to install this package:
|
Version:
4.0.1 ▾
|
from typing import List, Optional, cast
import numpy as np
import pandas as pd
from statsmodels.stats.weightstats import DescrStatsW
from sarus_statistics.ops.covariance.local import covariance
from sarus_statistics.ops.utils import generator_from_seed, rescale_weights
EPS = 1e-5
def corr(
data: pd.DataFrame,
data_cols: List[str],
user_col: str,
private_col: str,
weight_col: str,
max_multiplicity: float,
epsilon: float,
norm: float,
dims: Optional[int],
estimated_corr: Optional[np.ndarray] = None,
random_generator: Optional[np.random.Generator] = None,
) -> np.ndarray:
"""
Compute DP Pearson corr of dataframe from DP covariance.
See its implementation
If eigenvalue computation fails (column filled with NaNs for example),
fallback to naive implementation.
"""
try:
cov = covariance(
data,
data_cols,
user_col,
private_col,
weight_col,
max_multiplicity,
epsilon,
norm,
dims,
random_generator,
)
except AssertionError:
# if we drop all rows because of NaNs
result = np.empty((len(data_cols), len(data_cols)))
result[:] = np.nan
return result
except np.linalg.LinAlgError:
return naive_corr(
data,
data_cols,
user_col,
private_col,
weight_col,
max_multiplicity,
len(data_cols) * (len(data_cols) - 1) // 2 / epsilon,
random_generator,
)
stds = np.sqrt(np.diagonal(cov))
stds = np.expand_dims(stds, 0)
stds = stds.T.dot(stds)
correlation = cov / stds
if estimated_corr is not None:
assert correlation.shape == estimated_corr.shape
# adding a small fraction of the estimated_corr
# in order to preserve NaN values
correlation += estimated_corr * EPS
return cast(np.ndarray, np.clip(correlation, -1.0, 1.0))
def naive_corr(
data: pd.DataFrame,
data_cols: List[str],
user_col: str,
private_col: str,
weight_col: str,
max_multiplicity: float,
noise: float,
random_generator: Optional[np.random.Generator] = None,
) -> np.ndarray:
random_generator = (
random_generator
if random_generator is not None
else generator_from_seed(random_generator)
)
sensibility = 2
private_data = data.dropna()
private_data = rescale_weights(
data=private_data,
user_col=user_col,
private_col=private_col,
weight_col=weight_col,
max_multiplicity=max_multiplicity,
)
weight_stats = DescrStatsW(
private_data[data_cols], weights=private_data[weight_col]
)
corr_result = weight_stats.corrcoef
# Add noise
number_of_laplace = len(data_cols) * (len(data_cols) - 1) // 2
noise_matrix = np.zeros_like(corr_result)
noise_matrix[
np.triu_indices(len(data_cols), 1)
] = random_generator.laplace(
loc=0, scale=noise * sensibility, size=number_of_laplace
) # upper triangular noise matrix (w/ diag = 0)
noise_matrix = noise_matrix + noise_matrix.T # symmetrical noise
noisy_result = np.clip(corr_result + noise_matrix, -1, 1)
return cast(np.ndarray, noisy_result)