Repository URL to install this package:
|
Version:
4.0.1 ▾
|
import typing as t
import numpy as np
import pandas as pd
import pytest
from sarus_statistics.ops.corr.local import corr
from sarus_statistics.ops.covariance.diffprivlib_cov import (
covariance_eig,
scale_clip,
)
np.random.seed(0)
EPSILON = 1e9
def generate_correlated_dataframe(num_samples=1000, correlation=0.0):
"""It generates a pandas dataframe with 2 columns with a chosen
correlation.
"""
# Generate random data for two independent variables
x = np.random.normal(0, 1, num_samples)
# Introduce correlation by scaling one variable and adding noise
y = correlation * x + np.sqrt(1 - correlation**2) * np.random.normal(
0, 1, num_samples
)
return pd.DataFrame({'X': x, 'Y': y})
def scale_clip_data(
data: pd.DataFrame,
data_cols: t.List[str],
quantile_max_norm: int,
admin_cols: t.Tuple[str, str, str],
):
"""It scales the data set such that the max norm for each entry is
at most 1. It returns the dataframe and the max norm.
"""
public, user_col, weights = admin_cols
row_norms = np.linalg.norm(data[data_cols].fillna(0.0), axis=1)
max_norm = np.percentile(row_norms, quantile_max_norm)
admin_data = data[[public, user_col, weights]]
data_df = pd.DataFrame(
scale_clip(
data[data_cols].to_numpy(),
admin_data[weights].to_numpy(),
max_norm,
),
columns=data_cols,
index=data.index,
)
return data_df.join(admin_data), max_norm
def corr_from_cov(cov):
stds = np.sqrt(np.diagonal(cov))
stds = np.expand_dims(stds, 0)
stds = stds.T.dot(stds)
return cov / stds
def noisy_c_from_covariance_eig(
array: np.ndarray, weights: np.ndarray, epsilon: float, random_state: int
):
eigenvalues, eigenvectors = covariance_eig(
array,
weights=weights,
epsilon=epsilon,
random_state=random_state,
)
return eigenvectors.dot(np.diag(eigenvalues)).dot(eigenvectors.T)
def test_corr_with_column_normalization(ops_data, admin_cols):
data_cols = ['integer_pv', 'integer', 'bool']
public, user_col, weights = admin_cols
data = ops_data[[*data_cols, public, user_col, weights]]
data[data_cols] = data[data_cols].astype(float)
true_corr = data[data_cols].corr().to_numpy()
data[data_cols] = data[data_cols] / data[data_cols].std()
avgs = data[data_cols].mean(axis=0).to_numpy()
data[data_cols] = data[data_cols] - avgs
clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)
experiments = 50
corr_value = np.mean(
[
corr(
clipped_data,
data_cols=data_cols,
user_col=user_col,
private_col=public,
weight_col=weights,
epsilon=10.0,
max_multiplicity=10,
norm=max_norm,
dims=None,
)
for _ in range(experiments)
],
axis=0,
)
assert np.isclose(corr_value, true_corr, atol=0.1).all()
def test_corr_with_dims(ops_data, admin_cols):
data_cols = ['integer_pv', 'integer', 'bool']
public, user_col, weights = admin_cols
data = ops_data[[*data_cols, public, user_col, weights]]
data[data_cols] = data[data_cols].astype(float)
data[data_cols] = data[data_cols] / data[data_cols].std()
data[data_cols] = data[data_cols] - data[data_cols].mean()
clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)
corr_value = corr(
clipped_data,
data_cols=data_cols,
user_col=user_col,
private_col=public,
weight_col=weights,
epsilon=10.0,
max_multiplicity=10,
norm=max_norm,
dims=2,
)
assert corr_value.shape == (3, 3)
def test_corr_with_nan(ops_data, admin_cols):
public, user_col, weights = admin_cols
data_cols = ['integer_pv', 'integer', 'bool']
data = ops_data[[*data_cols, public, user_col, weights]]
# add NaN for testing
data.loc[0, 'integer_pv'] = np.nan
data[data_cols] = data[data_cols].astype(float)
true_corr = data[data_cols].corr().to_numpy()
# simulate column normalization and clipping with infos from synth data
data[data_cols] = data[data_cols] / data[data_cols].std()
avgs = data[data_cols].mean(axis=0).to_numpy()
data[data_cols] = data[data_cols] - avgs
clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)
corr_value = corr(
clipped_data,
data_cols=data_cols,
user_col=user_col,
private_col=public,
weight_col=weights,
epsilon=EPSILON,
max_multiplicity=10,
norm=max_norm,
dims=None,
)
assert pytest.approx(corr_value, 0.1) == true_corr
def test_corr_with_nan_full_row(ops_data, admin_cols):
public, user_col, weights = admin_cols
data_cols = ['integer_pv', 'integer', 'bool']
data = ops_data[[*data_cols, public, user_col, weights]]
# add NaN for testing
data.loc[0, :] = np.nan
data[data_cols] = data[data_cols].astype(float)
true_corr = data[data_cols].corr().to_numpy()
# simulate column normalization and clipping with infos from synth data
data[data_cols] = data[data_cols] / data[data_cols].std()
avgs = data[data_cols].mean(axis=0).to_numpy()
data[data_cols] = data[data_cols] - avgs
clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)
corr_value = corr(
clipped_data,
data_cols=data_cols,
user_col=user_col,
private_col=public,
weight_col=weights,
epsilon=EPSILON,
max_multiplicity=10,
norm=max_norm,
dims=None,
)
assert pytest.approx(corr_value, 0.1) == true_corr
def test_corr_full_nan(ops_data, admin_cols):
public, user_col, weights = admin_cols
data_cols = ['integer', 'optional_full_none']
data = ops_data[[*data_cols, public, user_col, weights]]
data[data_cols] = data[data_cols].astype(float)
data[data_cols] = data[data_cols] / data[data_cols].std()
data[data_cols] = data[data_cols] - data[data_cols].mean()
clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)
corr_value = corr(
data,
data_cols=data_cols,
user_col=user_col,
private_col=public,
weight_col=weights,
epsilon=EPSILON,
max_multiplicity=10,
norm=max_norm,
dims=None,
)
assert np.isnan(corr_value).all() and corr_value.shape == (2, 2)
def test_corr_one_col(ops_data, admin_cols):
public, user_col, weights = admin_cols
data_cols = ['integer']
data = ops_data[[*data_cols, public, user_col, weights]]
data[data_cols] = data[data_cols].astype(float)
data[data_cols] = data[data_cols] / data[data_cols].std()
data[data_cols] = data[data_cols] - data[data_cols].mean()
clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)
corr_value = corr(
data,
data_cols=data_cols,
user_col=user_col,
private_col=public,
weight_col=weights,
epsilon=EPSILON,
max_multiplicity=10,
norm=max_norm,
dims=None,
)
assert corr_value.shape == (1, 1) and corr_value[0, 0] == 1.0
def test_corr_zero_col(ops_data, admin_cols):
public, user_col, weights = admin_cols
data_cols = []
data = ops_data[[public, user_col, weights]]
data[data_cols] = data[data_cols].astype(float)
data[data_cols] = data[data_cols] / data[data_cols].std()
data[data_cols] = data[data_cols] - data[data_cols].mean()
clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)
corr_value = corr(
data,
data_cols=[],
user_col=user_col,
private_col=public,
weight_col=weights,
epsilon=EPSILON,
max_multiplicity=10,
norm=max_norm,
dims=None,
)
assert not corr_value.any()
def test_corr_zero_variance_col(ops_data, admin_cols):
public, user_col, weights = admin_cols
data_cols = ['integer_pv', 'integer', 'bool']
data = ops_data[[*data_cols, public, user_col, weights]]
data['ones'] = 1.0
data_cols.append('ones')
data[data_cols] = data[data_cols].astype(float)
true_corr = data[data_cols].corr().to_numpy()
data[data_cols] = data[data_cols] / np.maximum(data[data_cols].std(), 1.0)
avgs = data[data_cols].mean(axis=0).to_numpy()
data[data_cols] = data[data_cols] - avgs
clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)
corr_value = corr(
clipped_data,
data_cols=data_cols,
user_col=user_col,
private_col=public,
weight_col=weights,
epsilon=1.0,
max_multiplicity=10,
norm=max_norm,
dims=None,
estimated_corr=true_corr,
)
assert (np.isnan(corr_value[-1]) == np.isnan(true_corr[-1])).all()
assert (np.isnan(corr_value[:, -1]) == np.isnan(true_corr[:, -1])).all()
def test_corr_identical_columns(ops_data, admin_cols):
public, user_col, weights = admin_cols
data = ops_data[['integer', public, user_col, weights]]
data['integer_bis'] = data['integer']
data_cols = ['integer', 'integer_bis']
data[data_cols] = data[data_cols].astype(float)
true_corr = data[data_cols].corr().to_numpy()
data[data_cols] = data[data_cols].astype(float)
data[data_cols] = data[data_cols] - data[data_cols].mean()
clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)
experiments = 50
corr_value = np.mean(
[
corr(
clipped_data,
data_cols=data_cols,
user_col=user_col,
private_col=public,
weight_col=weights,
epsilon=5.0,
max_multiplicity=10,
norm=max_norm,
dims=None,
)
for _ in range(experiments)
],
axis=0,
)
assert pytest.approx(corr_value, 0.1) == true_corr
def test_noisy_cov_big_epsilon(ops_data, admin_cols):
public, user_col, weights = admin_cols
data_cols = ['integer_pv', 'integer', 'bool']
data = ops_data[[*data_cols, public, user_col, weights]]
admin_data = data[[public, user_col, weights]]
data[data_cols] = data[data_cols].astype(float)
true_cov = data[data_cols].cov().to_numpy()
n = len(data)
# CLIPPING
row_norms = np.linalg.norm(data[data_cols], axis=1)
max_norm = np.percentile(row_norms, 100)
clipped_data = data[data_cols] / np.maximum(row_norms, max_norm)[:, None]
clipped_data = clipped_data.join(admin_data)
# computing some quantities used to compare intermediate results
clipped_cov = clipped_data[data_cols].cov().to_numpy()
X = clipped_data[data_cols].to_numpy()
C = X.T.dot(X)
avgs_clipped = clipped_data[data_cols].mean().to_numpy()
mean_matrix = avgs_clipped[:, None].dot(avgs_clipped[None, :])
assert np.linalg.norm(clipped_data[data_cols], axis=1).max() <= 1.0 + 1e-8
epsilon = 1e9
# comparable with C
noisy_c = noisy_c_from_covariance_eig(
array=X,
weights=clipped_data[weights].to_numpy(),
epsilon=epsilon,
random_state=1,
)
assert np.isclose(noisy_c, C, atol=1e-2).all()
# comparable with clipped_data.cov()
nc_noisy_c = (noisy_c - n * mean_matrix) / (n - 1)
assert np.isclose(nc_noisy_c, clipped_cov, atol=1e-5).all()
# comparable with true_cov
noisy_cov_ = nc_noisy_c * (max_norm**2)
assert np.isclose(noisy_cov_, true_cov, atol=2).all()
def test_noisy_cov_big_epsilon_clipping_50(ops_data, admin_cols):
public, user_col, weights = admin_cols
data_cols = ['integer_pv', 'integer', 'bool']
data = ops_data[[*data_cols, public, user_col, weights]]
data[data_cols] = data[data_cols].astype(float)
true_cov = data[data_cols].cov().to_numpy()
n = len(data)
# CLIPPING
clipped_data, max_norm = scale_clip_data(data, data_cols, 50, admin_cols)
# computing some quantities used to compare intermediate results
clipped_cov = clipped_data[data_cols].cov().to_numpy()
X = clipped_data[data_cols].to_numpy()
C = X.T.dot(X)
avgs_clipped = clipped_data[data_cols].mean().to_numpy()
mean_matrix = avgs_clipped[:, None].dot(avgs_clipped[None, :])
assert np.linalg.norm(clipped_data[data_cols], axis=1).max() <= 1.0 + 1e-8
epsilon = 1e9
# comparable with C
noisy_c = noisy_c_from_covariance_eig(
X,
weights=clipped_data[weights].to_numpy(),
epsilon=epsilon,
random_state=1,
)
assert np.isclose(noisy_c, C, atol=1e-2).all()
# comparable with clipped_data.cov()
nc_noisy_c = (noisy_c - n * mean_matrix) / (n - 1)
assert np.isclose(nc_noisy_c, clipped_cov, atol=1e-5).all()
# comparable with true_cov
unscaled_noisy_c = nc_noisy_c * (max_norm**2)
assert not np.isclose(unscaled_noisy_c, true_cov, atol=2).all()
def test_noisy_cov_small_epsilon(ops_data, admin_cols):
data_cols = ['integer_pv', 'integer', 'bool']
public, user_col, weights = admin_cols
data = ops_data[[*data_cols, public, user_col, weights]]
data[data_cols] = data[data_cols].astype(float)
true_cov = data[data_cols].cov().to_numpy()
n = len(data)
n_experiments = 50
epsilon = 2.0
# CLIPPING
clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)
# computing some quantities used to compare intermediate results
clipped_cov = clipped_data[data_cols].cov().to_numpy()
X = clipped_data[data_cols].to_numpy()
C = X.T.dot(X)
avgs_clipped = clipped_data[data_cols].mean().to_numpy()
mean_matrix = avgs_clipped[:, None].dot(avgs_clipped[None, :])
assert np.linalg.norm(clipped_data[data_cols], axis=1).max() <= 1.0 + 1e-8
# comparable with C
noisy_cs = [
noisy_c_from_covariance_eig(
X,
weights=clipped_data[weights].to_numpy(),
epsilon=epsilon,
random_state=None,
)
for _ in range(n_experiments)
]
noisy_c = np.mean(noisy_cs, axis=0)
noisy_c_max_value = noisy_c[1][1]
assert noisy_c_max_value == np.mean(noisy_cs, axis=0).max()
# compare only the max value: C[1][1] = 312..
# this is quite high due with respect to the other terms.
# the second colum `integer` has values from 1 to 999
assert np.isclose(noisy_c_max_value, C.max(), atol=10)
nc_noisy_c = (noisy_c - n * mean_matrix) / (n - 1)
# clipped_cov.max() = 0.084
assert np.isclose(nc_noisy_c[1][1], clipped_cov.max(), atol=0.01)
# comparable with true_cov
unscaled_noisy_cov = nc_noisy_c * (max_norm**2)
# true_cov.max() = 83787.
assert np.isclose(unscaled_noisy_cov[1][1], true_cov.max(), rtol=0.1)
def test_noisy_cov_non_correlated_cols(admin_cols):
public, user_col, weights = admin_cols
data = generate_correlated_dataframe(num_samples=1000, correlation=0.0)
data_cols = ["X", "Y"]
true_cov = data.cov().to_numpy()
n = len(data)
data[public] = False
data[user_col] = np.arange(n)
data[weights] = 1.0
clipped_data, max_norm = scale_clip_data(
data, data_cols=data_cols, quantile_max_norm=100, admin_cols=admin_cols
)
# computing some quantities used to compare intermediate results
clipped_cov = clipped_data[data_cols].cov().to_numpy()
X = clipped_data[data_cols].to_numpy()
C = X.T.dot(X)
avgs_clipped = clipped_data[data_cols].mean().to_numpy()
mean_matrix = avgs_clipped[:, None].dot(avgs_clipped[None, :])
assert np.linalg.norm(clipped_data[data_cols], axis=1).max() <= 1.0 + 1e-8
epsilon = 2.0
n_experiments = 50
# comparable with C
noisy_cs = [
noisy_c_from_covariance_eig(
X,
weights=clipped_data[weights].to_numpy(),
epsilon=epsilon,
random_state=None,
)
for _ in range(n_experiments)
]
# take the average of observations
noisy_c = np.mean(noisy_cs, axis=0)
# comparing only diagonal terms (around 70).
# the off-diagonal terms are close to zero with some noise
# np.diag(C): [74.41819068, 69.59160969]
assert np.isclose(np.diag(noisy_c), np.diag(C), atol=4).all()
# comparable with clipped_data.cov()
nc_noisy_c = (noisy_c - n * mean_matrix) / (n - 1)
# np.diag(clipped_cov: [0.0743364 , 0.06946103]
assert np.isclose(
np.diag(nc_noisy_c), np.diag(clipped_cov), atol=0.005
).all()
# comparable with true_cov
noisy_cov_ = nc_noisy_c * (max_norm**2)
# np.diag(true_cov): [0.97520967, 0.91125022])
assert np.isclose(np.diag(noisy_cov_), np.diag(true_cov), atol=0.5).all()
def test_noisy_cov_fully_correlated_cols(admin_cols):
x = np.random.normal(0, 1, 1000)
y = x
data = pd.DataFrame({'X': x, 'Y': y})
data_cols = ["X", "Y"]
true_cov = data.cov().to_numpy()
true_corr = data.corr().to_numpy()
n = len(data)
public, user_col, weights = admin_cols
data[public] = False
data[user_col] = np.arange(n)
data[weights] = 1.0
clipped_data, max_norm = scale_clip_data(
data, data_cols=data_cols, quantile_max_norm=100, admin_cols=admin_cols
)
# computing some quantities used to compare intermediate results
clipped_cov = clipped_data[data_cols].cov().to_numpy()
X = clipped_data[data_cols].to_numpy()
C = X.T.dot(X)
avgs_clipped = clipped_data[data_cols].mean().to_numpy()
mean_matrix = avgs_clipped[:, None].dot(avgs_clipped[None, :])
assert np.linalg.norm(clipped_data[data_cols], axis=1).max() <= 1.0 + 1e-8
epsilon = 2.0
n_experiments = 50
# comparable with C
noisy_cs = [
noisy_c_from_covariance_eig(
X,
weights=clipped_data[weights].to_numpy(),
epsilon=epsilon,
random_state=None,
)
for _ in range(n_experiments)
]
# take the average of observations
noisy_c = np.mean(noisy_cs, axis=0)
# C[0][0]: 52.60717426 (all elements are the same)
assert np.isclose(noisy_c, C, atol=10).all()
# comparable with clipped_data.cov()
nc_noisy_c = (noisy_c - n * mean_matrix) / (n - 1)
# clipped_cov[0][0]: 0.05254936
assert np.isclose(nc_noisy_c, clipped_cov, atol=0.01).all()
# comparable with true_cov
noisy_cov_ = nc_noisy_c * (max_norm**2)
# true_cov[0][0]: 0.97520967
assert np.isclose(noisy_cov_, true_cov, atol=0.25).all()
assert np.isclose(corr_from_cov(noisy_cov_), true_corr, atol=0.2).all()