Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import typing as t

import pyarrow as pa

from sarus_statistics.ops.links.local import links as private_links

from sarus_data_spec.constants import (
    OPTIONAL_VALUE,
    PUBLIC,
    PU_COLUMN,
    WEIGHTS,
)
from sarus_data_spec.links import link_statistics, links
from sarus_data_spec.path import Path, path
from sarus_data_spec.sarus_statistics.protobuf.links_pb2 import (
    RandomLinksParameters,
)
from sarus_data_spec.sarus_statistics.tasks.base import get_is_public
from sarus_data_spec.sarus_statistics.tasks.links.visitor_foreign_keys import (
    column_data_as_pandas,
)
from sarus_data_spec.sarus_statistics.tasks.marginals.utils import (
    update_quantiles,
)
import sarus_data_spec.protobuf as sp
import sarus_data_spec.statistics as sds
import sarus_data_spec.typing as sdty

FALSE_POSITIVE_PROB = 1.0 - 1e-9
LINKS_STATISTICS = "links_statistics"


# pylint: disable=too-many-locals
def links_statistics(
    synthetic_dataset: sdty.Dataset, links_dp_params: RandomLinksParameters
) -> sdty.Links:
    """The dataset is the snthetic dataset, so we retrieve the parent
    Iterates over the foreign keys of the dataset and
    computes DP distribution of links between each pair of columns"""

    list_ds, _ = synthetic_dataset.parents()
    assert len(list_ds) == 1
    dataset = t.cast(sdty.Dataset, list_ds[0])
    schema = dataset.schema()
    f_ks = dataset.foreign_keys()
    table = pa.Table.from_batches(list(dataset.to_arrow()))
    dataset_marginals = dataset.marginals()
    assert dataset_marginals
    statistics = dataset_marginals.statistics()
    data_type = schema.data_type()

    links_stats = []
    for pointing_path, pointed_path in f_ks.items():
        # # create common path with weights, protected_entity and
        # foreign_key/primary_key
        pointing_fk = schema.type().get(
            Path(
                sp.Path(label=dataset.name(), paths=[pointing_path.protobuf()])
            )
        )
        pointing_data = column_data_as_pandas(pointing_fk, table)
        pointing_stats = statistics.nodes_statistics(pointing_path)[0]
        pointed_stats = statistics.nodes_statistics(pointed_path)[0]
        # NOW COMPUTE COUNT INFO WITH POINTING DP PARAMS
        data_col_name = pointing_path.to_strings_list()[0][-1]
        if data_col_name == OPTIONAL_VALUE:
            # the col name in the dataframe is not the last in the
            # list but rather the second to last
            data_col_name = pointing_path.to_strings_list()[0][-2]

        # check if public
        path_pointing_struct = path(
            paths=data_type.get(pointing_path).structs()
        )
        path_pointed_struct = path(
            paths=(data_type.get(pointed_path).structs())
        )
        if not get_is_public(
            data_type.sub_types(path_pointing_struct)[0].properties()
        ) or not get_is_public(
            data_type.sub_types(path_pointed_struct)[0].properties()
        ):
            noise = links_dp_params.noise
        else:
            noise = 0

        quantiles = private_links(
            data=pointing_data,
            col_name=data_col_name,
            user_col=PU_COLUMN,
            weight_col=WEIGHTS,
            private_col=PUBLIC,
            noise=noise,
            mult_fk=pointing_stats.multiplicity(),
            mult_pk=pointed_stats.multiplicity(),
            size_fk=pointing_stats.size(),
            size_pk=pointed_stats.size(),
            nb_quantiles=links_dp_params.nb_quantiles,
            isotonic_regression=links_dp_params.isotonic_regression,
            iota=1e-3,
            peid_only=True,
        )

        # set diracs
        probabilities, values = update_quantiles(
            quantiles=list(quantiles.values()),
            probabilities=list(quantiles.keys()),
            is_int=True,
        )

        distrib = sds.Integer_Distribution(
            min_value=int(values[0]),
            max_value=int(values[-1]),
            probabilities=probabilities,
            values=[int(q) for q in values],
        )
        links_stats.append(
            link_statistics(
                pointing=pointing_path,
                pointed=pointed_path,
                distribution=distrib,
            )
        )
    return links(dataset=dataset, links_statistics=links_stats)