Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
from typing import Any, List, Tuple, Type
import os

from numpy import ndarray
import numpy as np

try:
    import tensorflow as tf
except ModuleNotFoundError:
    raise ModuleNotFoundError(
        "Please intall tensorflow if\n"
        "you are planning to use Logistic and NN mergers"
    )


# Helper functions
def load_model(model_name: str) -> tf.keras.Model:
    """Load model

    Args:
        model_name (str)

    Returns:
        tf.keras.Model
    """
    return tf.keras.models.load_model(
        os.path.join(
            os.path.dirname(__file__), "tf_trained_models", model_name
        )
    )


class Logistic:  # pylint: disable=too-many-instance-attributes
    """Logistic model to perform the merge between DP and synthetic data.
    Model Features are:
        "synth_sigma": standard deviation over a synthetic result column
        "dp_synth_bin_ratio": ratio between dp and synthetic result lines
        "intersection_total_bin_ratio": ratio between values in commmon keys
            (common groups between DP and synthetic results) over total keys.
        "l2_on_intersecion_bins": l2 distance computed on common keys
        "epsilon": target epsilon for the query
        "length": dataset length
    """

    def __init__(
        self,
        target_epsilon: float,
        ds_length: int,
        keep_float: bool,
        model_name: str = "lg-01-04-2022",
        **_kwargs: Any,
    ):
        """Constructor

        Args:
            target_epsilon (float)
            ds_length (int): data set length
            keep_float (bool): if True the return resul will be float
                else it will be of the same type of synthetic resuls.
            model_name (str, optional) Defaults to "lg-01-04-2022".
        """

        self.target_epsilon = target_epsilon
        self.ds_length = ds_length
        self.keep_float = keep_float
        self.model = load_model(model_name)
        self.n_columns: int
        self.dtypes: List[Type[Any]]
        self.key_index: List[int]
        self.all_groups: List[Tuple[Any, ...]]

    def merge(
        self,
        dp_keys: List[Tuple[Any, ...]],
        dp_values: List[List[Any]],
        sy_keys: List[Tuple[Any, ...]],
        sy_values: List[List[Any]],
    ) -> List[List[Any]]:
        """Dp and synthetic results are merged here.

        Args:
            dp_keys (List[Tuple[Any, ...]]):
                list containing tuples with group by values for each line.
                They are used as keys.
                e.g. given a query with 'GROUP BY sex, education_num',
                dp_keys would be :
                [("Male", 1), ("Female", 1)] where "Female" and "Male"

            dp_values (List[List[Any]]):
                list with row values from dp results. e.g. given a query like
                'SELECT sex, education_num, AVG(age), COUNT(*) FROM census
                GROUP BY sex, education_num' dp_values would be:
                [["Male", 1, 32.3, 444], ["Female", 1, 44.4, 654]]
            sy_keys (List[Tuple[Any, ...]]): similarly as dp_keys, list
                containing tuples with group by values for each line.
            sy_values (List[List[Any]]): similarly as dp_values,
                list with row values from synthetic results.

        Returns:
            List[List[Any]]: merged results
        """

        features, dp_array, sy_array = self.extract_features(
            dp_keys, dp_values, sy_keys, sy_values
        )
        input_dict = {
            "features": features,
            "dp": dp_array,
            "synth": sy_array,
            "y_true": np.zeros_like(sy_array),
            "norm_factor": np.zeros_like(sy_array),
        }
        res = self.model.predict(input_dict)
        return self.reformat_results(res)

    def extract_features(  # pylint: disable=too-many-locals
        self,
        dp_keys: List[Tuple[Any, ...]],
        dp_values: List[List[Any]],
        sy_keys: List[Tuple[Any, ...]],
        sy_values: List[List[Any]],
    ) -> Tuple[ndarray, ndarray, ndarray]:
        """Extract computable features from dp and synthetic
        values and relative keys.

        Args:
            dp_keys (List[Tuple[Any, ...]]):
                list containing tuples with group by values for each line.
                They are used as keys.
                e.g. given a query with 'GROUP BY sex, education_num',
                dp_keys would be :
                [("Male", 1), ("Female", 1)] where "Female" and "Male"

            dp_values (List[List[Any]]):
                list with row values from dp results. e.g. given a query like
                'SELECT sex, education_num, AVG(age), COUNT(*) FROM census
                GROUP BY sex, education_num' dp_values would be:
                [["Male", 1, 32.3, 444], ["Female", 1, 44.4, 654]]
            sy_keys (List[Tuple[Any, ...]]): similarly as dp_keys, list
                containing tuples with group by values for each line.
            sy_values (List[List[Any]]): similarly as dp_values, list with
                row values from synthetic results.

        Returns:
            Tuple[ndarray, ndarray, ndarray]: model features
        """

        _dp = []
        _sy = []
        _common = []
        # that is common to both dp and sy values
        self.key_index = [sy_values[0].index(i) for i in sy_keys[0]]
        self.dtypes = [
            type(sy_values[0][i])
            for i in range(len(sy_values[0]))
            if i not in self.key_index
        ]
        self.n_columns = len(sy_values[0]) - len(self.key_index)
        self.all_groups = list(set(dp_keys + sy_keys))

        for grouping_key in self.all_groups:
            if grouping_key in dp_keys and grouping_key in sy_keys:
                _common.append(
                    [
                        res
                        for res in dp_values[dp_keys.index(grouping_key)]
                        if res not in grouping_key
                    ]
                )
                _dp.append(
                    [
                        res
                        for res in dp_values[dp_keys.index(grouping_key)]
                        if res not in grouping_key
                    ]
                )
                _sy.append(
                    [
                        res
                        for res in sy_values[sy_keys.index(grouping_key)]
                        if res not in grouping_key
                    ]
                )

            elif grouping_key in dp_keys and grouping_key not in sy_keys:
                _dp.append(
                    [
                        res
                        for res in dp_values[dp_keys.index(grouping_key)]
                        if res not in grouping_key
                    ]
                )
                _sy.append([np.nan] * self.n_columns)
            else:
                _dp.append([np.nan] * self.n_columns)
                _sy.append(
                    [
                        res
                        for res in sy_values[sy_keys.index(grouping_key)]
                        if res not in grouping_key
                    ]
                )

        dp_array = np.nan_to_num(np.array(_dp), nan=0.0)  # type: ignore
        sy_array = np.nan_to_num(np.array(_sy), nan=0.0)  # type: ignore
        common_array = np.array(_common)

        synth_sigma = np.repeat(np.nanstd(sy_array, axis=0), dp_array.shape[0])  # type: ignore # noqa: E501
        dp_synth_bin_ratio = np.array(
            [len(dp_keys) / len(sy_keys)] * len(synth_sigma)
        )
        intersection_total_bin_ratio = np.array(
            [len(common_array) / (len(dp_keys) + len(sy_keys))]
            * len(synth_sigma)
        )
        if common_array.size == 0:
            l2_on_intersecion_bins = np.zeros_like(synth_sigma)
        else:
            l2_on_intersecion_bins = np.repeat(
                np.linalg.norm(common_array, axis=0),  # type: ignore
                dp_array.shape[0],
            )
        epsilon = np.repeat(self.target_epsilon, len(synth_sigma))
        length = np.repeat(self.ds_length, len(synth_sigma))
        features = np.vstack(
            (
                synth_sigma,
                dp_synth_bin_ratio,
                intersection_total_bin_ratio,
                l2_on_intersecion_bins,
                epsilon,
                length,
            )
        ).T
        features = np.nan_to_num(np.log(features), neginf=-1e-2)  # type: ignore # noqa: E501
        return (
            features,
            dp_array.flatten(order="F").astype("float64"),
            sy_array.flatten(order="F").astype("float64"),
        )

    def reformat_results(self, res: Any) -> List[List[Any]]:
        """Reoformat results

        Args:
            res (Any): array with dimension (1, n_lines*n_columns)

        Returns:
            List[List[Any]]: return outcome with the right format.
        """
        raw_results = res.reshape((-1, self.n_columns), order="F")

        if not self.keep_float:
            # this return a list of lists from raw_results where each column
            # has the same type of the corresponding synthetic column
            raw_results_list = list(
                zip(
                    *[
                        list(map(self.dtypes[i], raw_results[:, i]))
                        for i in range(len(self.dtypes))
                    ]
                )
            )
            raw_results_list = [list(elem) for elem in raw_results_list]  # type: ignore  # noqa: E501
        else:
            raw_results_list = raw_results.tolist()

        full_results = []
        for i, group in enumerate(self.all_groups):
            row = iter(raw_results_list[i])
            key = iter(group)
            full_results.append(
                [
                    next(row) if j not in self.key_index else next(key)
                    for j in range(len(self.key_index) + self.n_columns)
                ]
            )
        return full_results


class NN(Logistic):
    """sigma_dp: list of lists with floats,"""

    def __init__(  # pylint: disable=too-many-arguments
        self,
        target_epsilon: float,
        sigma_dp: List[Any],  # must have the same dimensions of dp_reults
        ds_length: int,
        keep_float: bool,
        model_name: str = "nn-01-04-2022",
        **_kwargs: Any,
    ) -> None:
        super().__init__(target_epsilon, ds_length, keep_float, model_name)
        self.sigma_dp = np.array(sigma_dp)

    def merge(
        self,
        dp_keys: List[Tuple[Any, ...]],
        dp_values: List[List[Any]],
        sy_keys: List[Tuple[Any, ...]],
        sy_values: List[List[Any]],
    ) -> List[List[Any]]:
        features, dp_array, sy_array = self.extract_features(
            dp_keys, dp_values, sy_keys, sy_values
        )
        dp_dummy = dp_array.reshape((-1, self.n_columns), order="F")
        mask = np.where(dp_dummy != 0)
        sigma_dp = np.zeros_like(dp_dummy)
        sigma_dp[mask] = self.sigma_dp.flatten()

        features = np.insert(features, 0, sigma_dp.flatten(order="F"), axis=1)  # type: ignore # noqa: E501
        input_dict = {
            "features": features,
            "dp": dp_array,
            "synth": sy_array,
            "y_true": np.zeros_like(sy_array),
            "norm_factor": np.zeros_like(sy_array),
        }
        res = self.model.predict(input_dict)
        return self.reformat_results(res)