Repository URL to install this package:
|
Version:
3.0.0.dev3 ▾
|
from typing import Any, List, Tuple, Type
import os
from numpy import ndarray
import numpy as np
try:
import tensorflow as tf
except ModuleNotFoundError:
raise ModuleNotFoundError(
"Please intall tensorflow if\n"
"you are planning to use Logistic and NN mergers"
)
# Helper functions
def load_model(model_name: str) -> tf.keras.Model:
"""Load model
Args:
model_name (str)
Returns:
tf.keras.Model
"""
return tf.keras.models.load_model(
os.path.join(
os.path.dirname(__file__), "tf_trained_models", model_name
)
)
class Logistic: # pylint: disable=too-many-instance-attributes
"""Logistic model to perform the merge between DP and synthetic data.
Model Features are:
"synth_sigma": standard deviation over a synthetic result column
"dp_synth_bin_ratio": ratio between dp and synthetic result lines
"intersection_total_bin_ratio": ratio between values in commmon keys
(common groups between DP and synthetic results) over total keys.
"l2_on_intersecion_bins": l2 distance computed on common keys
"epsilon": target epsilon for the query
"length": dataset length
"""
def __init__(
self,
target_epsilon: float,
ds_length: int,
keep_float: bool,
model_name: str = "lg-01-04-2022",
**_kwargs: Any,
):
"""Constructor
Args:
target_epsilon (float)
ds_length (int): data set length
keep_float (bool): if True the return resul will be float
else it will be of the same type of synthetic resuls.
model_name (str, optional) Defaults to "lg-01-04-2022".
"""
self.target_epsilon = target_epsilon
self.ds_length = ds_length
self.keep_float = keep_float
self.model = load_model(model_name)
self.n_columns: int
self.dtypes: List[Type[Any]]
self.key_index: List[int]
self.all_groups: List[Tuple[Any, ...]]
def merge(
self,
dp_keys: List[Tuple[Any, ...]],
dp_values: List[List[Any]],
sy_keys: List[Tuple[Any, ...]],
sy_values: List[List[Any]],
) -> List[List[Any]]:
"""Dp and synthetic results are merged here.
Args:
dp_keys (List[Tuple[Any, ...]]):
list containing tuples with group by values for each line.
They are used as keys.
e.g. given a query with 'GROUP BY sex, education_num',
dp_keys would be :
[("Male", 1), ("Female", 1)] where "Female" and "Male"
dp_values (List[List[Any]]):
list with row values from dp results. e.g. given a query like
'SELECT sex, education_num, AVG(age), COUNT(*) FROM census
GROUP BY sex, education_num' dp_values would be:
[["Male", 1, 32.3, 444], ["Female", 1, 44.4, 654]]
sy_keys (List[Tuple[Any, ...]]): similarly as dp_keys, list
containing tuples with group by values for each line.
sy_values (List[List[Any]]): similarly as dp_values,
list with row values from synthetic results.
Returns:
List[List[Any]]: merged results
"""
features, dp_array, sy_array = self.extract_features(
dp_keys, dp_values, sy_keys, sy_values
)
input_dict = {
"features": features,
"dp": dp_array,
"synth": sy_array,
"y_true": np.zeros_like(sy_array),
"norm_factor": np.zeros_like(sy_array),
}
res = self.model.predict(input_dict)
return self.reformat_results(res)
def extract_features( # pylint: disable=too-many-locals
self,
dp_keys: List[Tuple[Any, ...]],
dp_values: List[List[Any]],
sy_keys: List[Tuple[Any, ...]],
sy_values: List[List[Any]],
) -> Tuple[ndarray, ndarray, ndarray]:
"""Extract computable features from dp and synthetic
values and relative keys.
Args:
dp_keys (List[Tuple[Any, ...]]):
list containing tuples with group by values for each line.
They are used as keys.
e.g. given a query with 'GROUP BY sex, education_num',
dp_keys would be :
[("Male", 1), ("Female", 1)] where "Female" and "Male"
dp_values (List[List[Any]]):
list with row values from dp results. e.g. given a query like
'SELECT sex, education_num, AVG(age), COUNT(*) FROM census
GROUP BY sex, education_num' dp_values would be:
[["Male", 1, 32.3, 444], ["Female", 1, 44.4, 654]]
sy_keys (List[Tuple[Any, ...]]): similarly as dp_keys, list
containing tuples with group by values for each line.
sy_values (List[List[Any]]): similarly as dp_values, list with
row values from synthetic results.
Returns:
Tuple[ndarray, ndarray, ndarray]: model features
"""
_dp = []
_sy = []
_common = []
# that is common to both dp and sy values
self.key_index = [sy_values[0].index(i) for i in sy_keys[0]]
self.dtypes = [
type(sy_values[0][i])
for i in range(len(sy_values[0]))
if i not in self.key_index
]
self.n_columns = len(sy_values[0]) - len(self.key_index)
self.all_groups = list(set(dp_keys + sy_keys))
for grouping_key in self.all_groups:
if grouping_key in dp_keys and grouping_key in sy_keys:
_common.append(
[
res
for res in dp_values[dp_keys.index(grouping_key)]
if res not in grouping_key
]
)
_dp.append(
[
res
for res in dp_values[dp_keys.index(grouping_key)]
if res not in grouping_key
]
)
_sy.append(
[
res
for res in sy_values[sy_keys.index(grouping_key)]
if res not in grouping_key
]
)
elif grouping_key in dp_keys and grouping_key not in sy_keys:
_dp.append(
[
res
for res in dp_values[dp_keys.index(grouping_key)]
if res not in grouping_key
]
)
_sy.append([np.nan] * self.n_columns)
else:
_dp.append([np.nan] * self.n_columns)
_sy.append(
[
res
for res in sy_values[sy_keys.index(grouping_key)]
if res not in grouping_key
]
)
dp_array = np.nan_to_num(np.array(_dp), nan=0.0) # type: ignore
sy_array = np.nan_to_num(np.array(_sy), nan=0.0) # type: ignore
common_array = np.array(_common)
synth_sigma = np.repeat(np.nanstd(sy_array, axis=0), dp_array.shape[0]) # type: ignore # noqa: E501
dp_synth_bin_ratio = np.array(
[len(dp_keys) / len(sy_keys)] * len(synth_sigma)
)
intersection_total_bin_ratio = np.array(
[len(common_array) / (len(dp_keys) + len(sy_keys))]
* len(synth_sigma)
)
if common_array.size == 0:
l2_on_intersecion_bins = np.zeros_like(synth_sigma)
else:
l2_on_intersecion_bins = np.repeat(
np.linalg.norm(common_array, axis=0), # type: ignore
dp_array.shape[0],
)
epsilon = np.repeat(self.target_epsilon, len(synth_sigma))
length = np.repeat(self.ds_length, len(synth_sigma))
features = np.vstack(
(
synth_sigma,
dp_synth_bin_ratio,
intersection_total_bin_ratio,
l2_on_intersecion_bins,
epsilon,
length,
)
).T
features = np.nan_to_num(np.log(features), neginf=-1e-2) # type: ignore # noqa: E501
return (
features,
dp_array.flatten(order="F").astype("float64"),
sy_array.flatten(order="F").astype("float64"),
)
def reformat_results(self, res: Any) -> List[List[Any]]:
"""Reoformat results
Args:
res (Any): array with dimension (1, n_lines*n_columns)
Returns:
List[List[Any]]: return outcome with the right format.
"""
raw_results = res.reshape((-1, self.n_columns), order="F")
if not self.keep_float:
# this return a list of lists from raw_results where each column
# has the same type of the corresponding synthetic column
raw_results_list = list(
zip(
*[
list(map(self.dtypes[i], raw_results[:, i]))
for i in range(len(self.dtypes))
]
)
)
raw_results_list = [list(elem) for elem in raw_results_list] # type: ignore # noqa: E501
else:
raw_results_list = raw_results.tolist()
full_results = []
for i, group in enumerate(self.all_groups):
row = iter(raw_results_list[i])
key = iter(group)
full_results.append(
[
next(row) if j not in self.key_index else next(key)
for j in range(len(self.key_index) + self.n_columns)
]
)
return full_results
class NN(Logistic):
"""sigma_dp: list of lists with floats,"""
def __init__( # pylint: disable=too-many-arguments
self,
target_epsilon: float,
sigma_dp: List[Any], # must have the same dimensions of dp_reults
ds_length: int,
keep_float: bool,
model_name: str = "nn-01-04-2022",
**_kwargs: Any,
) -> None:
super().__init__(target_epsilon, ds_length, keep_float, model_name)
self.sigma_dp = np.array(sigma_dp)
def merge(
self,
dp_keys: List[Tuple[Any, ...]],
dp_values: List[List[Any]],
sy_keys: List[Tuple[Any, ...]],
sy_values: List[List[Any]],
) -> List[List[Any]]:
features, dp_array, sy_array = self.extract_features(
dp_keys, dp_values, sy_keys, sy_values
)
dp_dummy = dp_array.reshape((-1, self.n_columns), order="F")
mask = np.where(dp_dummy != 0)
sigma_dp = np.zeros_like(dp_dummy)
sigma_dp[mask] = self.sigma_dp.flatten()
features = np.insert(features, 0, sigma_dp.flatten(order="F"), axis=1) # type: ignore # noqa: E501
input_dict = {
"features": features,
"dp": dp_array,
"synth": sy_array,
"y_true": np.zeros_like(sy_array),
"norm_factor": np.zeros_like(sy_array),
}
res = self.model.predict(input_dict)
return self.reformat_results(res)