Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
sarus_statistics / tests / unit / test_ops / test_corr.py
Size: Mime:
import typing as t

import numpy as np
import pandas as pd
import pytest

from sarus_statistics.ops.corr.local import corr
from sarus_statistics.ops.covariance.diffprivlib_cov import (
    covariance_eig,
    scale_clip,
)

np.random.seed(0)
EPSILON = 1e9


def generate_correlated_dataframe(num_samples=1000, correlation=0.0):
    """It generates a pandas dataframe with 2 columns with a chosen
    correlation.
    """
    # Generate random data for two independent variables
    x = np.random.normal(0, 1, num_samples)

    # Introduce correlation by scaling one variable and adding noise
    y = correlation * x + np.sqrt(1 - correlation**2) * np.random.normal(
        0, 1, num_samples
    )
    return pd.DataFrame({'X': x, 'Y': y})


def scale_clip_data(
    data: pd.DataFrame,
    data_cols: t.List[str],
    quantile_max_norm: int,
    admin_cols: t.Tuple[str, str, str],
):
    """It scales the data set such that the max norm for each entry is
    at most 1. It returns the dataframe and the max norm.
    """
    public, user_col, weights = admin_cols
    row_norms = np.linalg.norm(data[data_cols].fillna(0.0), axis=1)
    max_norm = np.percentile(row_norms, quantile_max_norm)

    admin_data = data[[public, user_col, weights]]

    data_df = pd.DataFrame(
        scale_clip(
            data[data_cols].to_numpy(),
            admin_data[weights].to_numpy(),
            max_norm,
        ),
        columns=data_cols,
        index=data.index,
    )
    return data_df.join(admin_data), max_norm


def corr_from_cov(cov):
    stds = np.sqrt(np.diagonal(cov))
    stds = np.expand_dims(stds, 0)
    stds = stds.T.dot(stds)
    return cov / stds


def noisy_c_from_covariance_eig(
    array: np.ndarray, weights: np.ndarray, epsilon: float, random_state: int
):
    eigenvalues, eigenvectors = covariance_eig(
        array,
        weights=weights,
        epsilon=epsilon,
        random_state=random_state,
    )
    return eigenvectors.dot(np.diag(eigenvalues)).dot(eigenvectors.T)


def test_corr_with_column_normalization(ops_data, admin_cols):
    data_cols = ['integer_pv', 'integer', 'bool']
    public, user_col, weights = admin_cols
    data = ops_data[[*data_cols, public, user_col, weights]]

    data[data_cols] = data[data_cols].astype(float)
    true_corr = data[data_cols].corr().to_numpy()

    data[data_cols] = data[data_cols] / data[data_cols].std()
    avgs = data[data_cols].mean(axis=0).to_numpy()
    data[data_cols] = data[data_cols] - avgs
    clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)

    experiments = 50
    corr_value = np.mean(
        [
            corr(
                clipped_data,
                data_cols=data_cols,
                user_col=user_col,
                private_col=public,
                weight_col=weights,
                epsilon=10.0,
                max_multiplicity=10,
                norm=max_norm,
                dims=None,
            )
            for _ in range(experiments)
        ],
        axis=0,
    )

    assert np.isclose(corr_value, true_corr, atol=0.1).all()


def test_corr_with_dims(ops_data, admin_cols):
    data_cols = ['integer_pv', 'integer', 'bool']
    public, user_col, weights = admin_cols
    data = ops_data[[*data_cols, public, user_col, weights]]
    data[data_cols] = data[data_cols].astype(float)
    data[data_cols] = data[data_cols] / data[data_cols].std()
    data[data_cols] = data[data_cols] - data[data_cols].mean()
    clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)

    corr_value = corr(
        clipped_data,
        data_cols=data_cols,
        user_col=user_col,
        private_col=public,
        weight_col=weights,
        epsilon=10.0,
        max_multiplicity=10,
        norm=max_norm,
        dims=2,
    )
    assert corr_value.shape == (3, 3)


def test_corr_with_nan(ops_data, admin_cols):
    public, user_col, weights = admin_cols
    data_cols = ['integer_pv', 'integer', 'bool']
    data = ops_data[[*data_cols, public, user_col, weights]]

    # add NaN for testing
    data.loc[0, 'integer_pv'] = np.nan
    data[data_cols] = data[data_cols].astype(float)
    true_corr = data[data_cols].corr().to_numpy()

    # simulate column normalization and clipping with infos from synth data
    data[data_cols] = data[data_cols] / data[data_cols].std()
    avgs = data[data_cols].mean(axis=0).to_numpy()
    data[data_cols] = data[data_cols] - avgs
    clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)

    corr_value = corr(
        clipped_data,
        data_cols=data_cols,
        user_col=user_col,
        private_col=public,
        weight_col=weights,
        epsilon=EPSILON,
        max_multiplicity=10,
        norm=max_norm,
        dims=None,
    )
    assert pytest.approx(corr_value, 0.1) == true_corr


def test_corr_with_nan_full_row(ops_data, admin_cols):
    public, user_col, weights = admin_cols
    data_cols = ['integer_pv', 'integer', 'bool']
    data = ops_data[[*data_cols, public, user_col, weights]]

    # add NaN for testing
    data.loc[0, :] = np.nan
    data[data_cols] = data[data_cols].astype(float)
    true_corr = data[data_cols].corr().to_numpy()

    # simulate column normalization and clipping with infos from synth data
    data[data_cols] = data[data_cols] / data[data_cols].std()
    avgs = data[data_cols].mean(axis=0).to_numpy()
    data[data_cols] = data[data_cols] - avgs
    clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)

    corr_value = corr(
        clipped_data,
        data_cols=data_cols,
        user_col=user_col,
        private_col=public,
        weight_col=weights,
        epsilon=EPSILON,
        max_multiplicity=10,
        norm=max_norm,
        dims=None,
    )
    assert pytest.approx(corr_value, 0.1) == true_corr


def test_corr_full_nan(ops_data, admin_cols):
    public, user_col, weights = admin_cols
    data_cols = ['integer', 'optional_full_none']
    data = ops_data[[*data_cols, public, user_col, weights]]
    data[data_cols] = data[data_cols].astype(float)

    data[data_cols] = data[data_cols] / data[data_cols].std()
    data[data_cols] = data[data_cols] - data[data_cols].mean()
    clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)

    corr_value = corr(
        data,
        data_cols=data_cols,
        user_col=user_col,
        private_col=public,
        weight_col=weights,
        epsilon=EPSILON,
        max_multiplicity=10,
        norm=max_norm,
        dims=None,
    )
    assert np.isnan(corr_value).all() and corr_value.shape == (2, 2)


def test_corr_one_col(ops_data, admin_cols):
    public, user_col, weights = admin_cols
    data_cols = ['integer']
    data = ops_data[[*data_cols, public, user_col, weights]]
    data[data_cols] = data[data_cols].astype(float)

    data[data_cols] = data[data_cols] / data[data_cols].std()
    data[data_cols] = data[data_cols] - data[data_cols].mean()
    clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)

    corr_value = corr(
        data,
        data_cols=data_cols,
        user_col=user_col,
        private_col=public,
        weight_col=weights,
        epsilon=EPSILON,
        max_multiplicity=10,
        norm=max_norm,
        dims=None,
    )

    assert corr_value.shape == (1, 1) and corr_value[0, 0] == 1.0


def test_corr_zero_col(ops_data, admin_cols):
    public, user_col, weights = admin_cols
    data_cols = []
    data = ops_data[[public, user_col, weights]]

    data[data_cols] = data[data_cols].astype(float)
    data[data_cols] = data[data_cols] / data[data_cols].std()
    data[data_cols] = data[data_cols] - data[data_cols].mean()
    clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)

    corr_value = corr(
        data,
        data_cols=[],
        user_col=user_col,
        private_col=public,
        weight_col=weights,
        epsilon=EPSILON,
        max_multiplicity=10,
        norm=max_norm,
        dims=None,
    )

    assert not corr_value.any()


def test_corr_zero_variance_col(ops_data, admin_cols):
    public, user_col, weights = admin_cols
    data_cols = ['integer_pv', 'integer', 'bool']
    data = ops_data[[*data_cols, public, user_col, weights]]
    data['ones'] = 1.0
    data_cols.append('ones')

    data[data_cols] = data[data_cols].astype(float)
    true_corr = data[data_cols].corr().to_numpy()

    data[data_cols] = data[data_cols] / np.maximum(data[data_cols].std(), 1.0)
    avgs = data[data_cols].mean(axis=0).to_numpy()
    data[data_cols] = data[data_cols] - avgs
    clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)

    corr_value = corr(
        clipped_data,
        data_cols=data_cols,
        user_col=user_col,
        private_col=public,
        weight_col=weights,
        epsilon=1.0,
        max_multiplicity=10,
        norm=max_norm,
        dims=None,
        estimated_corr=true_corr,
    )

    assert (np.isnan(corr_value[-1]) == np.isnan(true_corr[-1])).all()
    assert (np.isnan(corr_value[:, -1]) == np.isnan(true_corr[:, -1])).all()


def test_corr_identical_columns(ops_data, admin_cols):
    public, user_col, weights = admin_cols
    data = ops_data[['integer', public, user_col, weights]]
    data['integer_bis'] = data['integer']
    data_cols = ['integer', 'integer_bis']
    data[data_cols] = data[data_cols].astype(float)

    true_corr = data[data_cols].corr().to_numpy()
    data[data_cols] = data[data_cols].astype(float)
    data[data_cols] = data[data_cols] - data[data_cols].mean()
    clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)

    experiments = 50

    corr_value = np.mean(
        [
            corr(
                clipped_data,
                data_cols=data_cols,
                user_col=user_col,
                private_col=public,
                weight_col=weights,
                epsilon=5.0,
                max_multiplicity=10,
                norm=max_norm,
                dims=None,
            )
            for _ in range(experiments)
        ],
        axis=0,
    )

    assert pytest.approx(corr_value, 0.1) == true_corr


def test_noisy_cov_big_epsilon(ops_data, admin_cols):
    public, user_col, weights = admin_cols
    data_cols = ['integer_pv', 'integer', 'bool']
    data = ops_data[[*data_cols, public, user_col, weights]]
    admin_data = data[[public, user_col, weights]]
    data[data_cols] = data[data_cols].astype(float)

    true_cov = data[data_cols].cov().to_numpy()
    n = len(data)

    # CLIPPING
    row_norms = np.linalg.norm(data[data_cols], axis=1)
    max_norm = np.percentile(row_norms, 100)
    clipped_data = data[data_cols] / np.maximum(row_norms, max_norm)[:, None]
    clipped_data = clipped_data.join(admin_data)

    # computing some quantities used to compare intermediate results
    clipped_cov = clipped_data[data_cols].cov().to_numpy()
    X = clipped_data[data_cols].to_numpy()
    C = X.T.dot(X)
    avgs_clipped = clipped_data[data_cols].mean().to_numpy()
    mean_matrix = avgs_clipped[:, None].dot(avgs_clipped[None, :])

    assert np.linalg.norm(clipped_data[data_cols], axis=1).max() <= 1.0 + 1e-8
    epsilon = 1e9

    # comparable with C
    noisy_c = noisy_c_from_covariance_eig(
        array=X,
        weights=clipped_data[weights].to_numpy(),
        epsilon=epsilon,
        random_state=1,
    )
    assert np.isclose(noisy_c, C, atol=1e-2).all()

    # comparable with clipped_data.cov()
    nc_noisy_c = (noisy_c - n * mean_matrix) / (n - 1)
    assert np.isclose(nc_noisy_c, clipped_cov, atol=1e-5).all()

    # comparable with true_cov
    noisy_cov_ = nc_noisy_c * (max_norm**2)
    assert np.isclose(noisy_cov_, true_cov, atol=2).all()


def test_noisy_cov_big_epsilon_clipping_50(ops_data, admin_cols):
    public, user_col, weights = admin_cols
    data_cols = ['integer_pv', 'integer', 'bool']
    data = ops_data[[*data_cols, public, user_col, weights]]
    data[data_cols] = data[data_cols].astype(float)

    true_cov = data[data_cols].cov().to_numpy()
    n = len(data)

    # CLIPPING
    clipped_data, max_norm = scale_clip_data(data, data_cols, 50, admin_cols)

    # computing some quantities used to compare intermediate results
    clipped_cov = clipped_data[data_cols].cov().to_numpy()
    X = clipped_data[data_cols].to_numpy()
    C = X.T.dot(X)
    avgs_clipped = clipped_data[data_cols].mean().to_numpy()
    mean_matrix = avgs_clipped[:, None].dot(avgs_clipped[None, :])

    assert np.linalg.norm(clipped_data[data_cols], axis=1).max() <= 1.0 + 1e-8

    epsilon = 1e9

    # comparable with C
    noisy_c = noisy_c_from_covariance_eig(
        X,
        weights=clipped_data[weights].to_numpy(),
        epsilon=epsilon,
        random_state=1,
    )
    assert np.isclose(noisy_c, C, atol=1e-2).all()

    # comparable with clipped_data.cov()
    nc_noisy_c = (noisy_c - n * mean_matrix) / (n - 1)
    assert np.isclose(nc_noisy_c, clipped_cov, atol=1e-5).all()

    # comparable with true_cov
    unscaled_noisy_c = nc_noisy_c * (max_norm**2)
    assert not np.isclose(unscaled_noisy_c, true_cov, atol=2).all()


def test_noisy_cov_small_epsilon(ops_data, admin_cols):
    data_cols = ['integer_pv', 'integer', 'bool']
    public, user_col, weights = admin_cols
    data = ops_data[[*data_cols, public, user_col, weights]]
    data[data_cols] = data[data_cols].astype(float)

    true_cov = data[data_cols].cov().to_numpy()
    n = len(data)

    n_experiments = 50
    epsilon = 2.0

    # CLIPPING
    clipped_data, max_norm = scale_clip_data(data, data_cols, 100, admin_cols)

    # computing some quantities used to compare intermediate results
    clipped_cov = clipped_data[data_cols].cov().to_numpy()
    X = clipped_data[data_cols].to_numpy()
    C = X.T.dot(X)
    avgs_clipped = clipped_data[data_cols].mean().to_numpy()
    mean_matrix = avgs_clipped[:, None].dot(avgs_clipped[None, :])

    assert np.linalg.norm(clipped_data[data_cols], axis=1).max() <= 1.0 + 1e-8

    # comparable with C
    noisy_cs = [
        noisy_c_from_covariance_eig(
            X,
            weights=clipped_data[weights].to_numpy(),
            epsilon=epsilon,
            random_state=None,
        )
        for _ in range(n_experiments)
    ]
    noisy_c = np.mean(noisy_cs, axis=0)
    noisy_c_max_value = noisy_c[1][1]
    assert noisy_c_max_value == np.mean(noisy_cs, axis=0).max()
    # compare only the max value: C[1][1] = 312..
    # this is quite high due with respect to the other terms.
    # the second colum `integer` has values from 1 to 999
    assert np.isclose(noisy_c_max_value, C.max(), atol=10)

    nc_noisy_c = (noisy_c - n * mean_matrix) / (n - 1)
    # clipped_cov.max() = 0.084
    assert np.isclose(nc_noisy_c[1][1], clipped_cov.max(), atol=0.01)

    # comparable with true_cov
    unscaled_noisy_cov = nc_noisy_c * (max_norm**2)
    # true_cov.max() = 83787.
    assert np.isclose(unscaled_noisy_cov[1][1], true_cov.max(), rtol=0.1)


def test_noisy_cov_non_correlated_cols(admin_cols):
    public, user_col, weights = admin_cols
    data = generate_correlated_dataframe(num_samples=1000, correlation=0.0)
    data_cols = ["X", "Y"]
    true_cov = data.cov().to_numpy()
    n = len(data)
    data[public] = False
    data[user_col] = np.arange(n)
    data[weights] = 1.0

    clipped_data, max_norm = scale_clip_data(
        data, data_cols=data_cols, quantile_max_norm=100, admin_cols=admin_cols
    )

    # computing some quantities used to compare intermediate results
    clipped_cov = clipped_data[data_cols].cov().to_numpy()
    X = clipped_data[data_cols].to_numpy()
    C = X.T.dot(X)
    avgs_clipped = clipped_data[data_cols].mean().to_numpy()
    mean_matrix = avgs_clipped[:, None].dot(avgs_clipped[None, :])

    assert np.linalg.norm(clipped_data[data_cols], axis=1).max() <= 1.0 + 1e-8

    epsilon = 2.0
    n_experiments = 50

    # comparable with C
    noisy_cs = [
        noisy_c_from_covariance_eig(
            X,
            weights=clipped_data[weights].to_numpy(),
            epsilon=epsilon,
            random_state=None,
        )
        for _ in range(n_experiments)
    ]
    # take the average of observations
    noisy_c = np.mean(noisy_cs, axis=0)

    # comparing only diagonal terms (around 70).
    # the off-diagonal terms are close to zero with some noise
    # np.diag(C): [74.41819068, 69.59160969]
    assert np.isclose(np.diag(noisy_c), np.diag(C), atol=4).all()

    # comparable with clipped_data.cov()
    nc_noisy_c = (noisy_c - n * mean_matrix) / (n - 1)
    # np.diag(clipped_cov: [0.0743364 , 0.06946103]
    assert np.isclose(
        np.diag(nc_noisy_c), np.diag(clipped_cov), atol=0.005
    ).all()

    # comparable with true_cov
    noisy_cov_ = nc_noisy_c * (max_norm**2)

    # np.diag(true_cov): [0.97520967, 0.91125022])
    assert np.isclose(np.diag(noisy_cov_), np.diag(true_cov), atol=0.5).all()


def test_noisy_cov_fully_correlated_cols(admin_cols):
    x = np.random.normal(0, 1, 1000)
    y = x
    data = pd.DataFrame({'X': x, 'Y': y})
    data_cols = ["X", "Y"]
    true_cov = data.cov().to_numpy()
    true_corr = data.corr().to_numpy()
    n = len(data)
    public, user_col, weights = admin_cols
    data[public] = False
    data[user_col] = np.arange(n)
    data[weights] = 1.0

    clipped_data, max_norm = scale_clip_data(
        data, data_cols=data_cols, quantile_max_norm=100, admin_cols=admin_cols
    )

    # computing some quantities used to compare intermediate results
    clipped_cov = clipped_data[data_cols].cov().to_numpy()
    X = clipped_data[data_cols].to_numpy()
    C = X.T.dot(X)
    avgs_clipped = clipped_data[data_cols].mean().to_numpy()
    mean_matrix = avgs_clipped[:, None].dot(avgs_clipped[None, :])

    assert np.linalg.norm(clipped_data[data_cols], axis=1).max() <= 1.0 + 1e-8

    epsilon = 2.0
    n_experiments = 50

    # comparable with C
    noisy_cs = [
        noisy_c_from_covariance_eig(
            X,
            weights=clipped_data[weights].to_numpy(),
            epsilon=epsilon,
            random_state=None,
        )
        for _ in range(n_experiments)
    ]
    # take the average of observations
    noisy_c = np.mean(noisy_cs, axis=0)

    # C[0][0]: 52.60717426 (all elements are the same)
    assert np.isclose(noisy_c, C, atol=10).all()

    # comparable with clipped_data.cov()
    nc_noisy_c = (noisy_c - n * mean_matrix) / (n - 1)
    # clipped_cov[0][0]: 0.05254936
    assert np.isclose(nc_noisy_c, clipped_cov, atol=0.01).all()

    # comparable with true_cov
    noisy_cov_ = nc_noisy_c * (max_norm**2)

    # true_cov[0][0]: 0.97520967
    assert np.isclose(noisy_cov_, true_cov, atol=0.25).all()
    assert np.isclose(corr_from_cov(noisy_cov_), true_corr, atol=0.2).all()