Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import typing as t
import pyarrow as pa
from sarus_statistics.ops.links.local import links as private_links
from sarus_data_spec.constants import (
OPTIONAL_VALUE,
PUBLIC,
PU_COLUMN,
WEIGHTS,
)
from sarus_data_spec.links import link_statistics, links
from sarus_data_spec.path import Path, path
from sarus_data_spec.sarus_statistics.protobuf.links_pb2 import (
RandomLinksParameters,
)
from sarus_data_spec.sarus_statistics.tasks.base import get_is_public
from sarus_data_spec.sarus_statistics.tasks.links.visitor_foreign_keys import (
column_data_as_pandas,
)
from sarus_data_spec.sarus_statistics.tasks.marginals.utils import (
update_quantiles,
)
import sarus_data_spec.protobuf as sp
import sarus_data_spec.statistics as sds
import sarus_data_spec.typing as sdty
FALSE_POSITIVE_PROB = 1.0 - 1e-9
LINKS_STATISTICS = "links_statistics"
# pylint: disable=too-many-locals
def links_statistics(
synthetic_dataset: sdty.Dataset, links_dp_params: RandomLinksParameters
) -> sdty.Links:
"""The dataset is the snthetic dataset, so we retrieve the parent
Iterates over the foreign keys of the dataset and
computes DP distribution of links between each pair of columns"""
list_ds, _ = synthetic_dataset.parents()
assert len(list_ds) == 1
dataset = t.cast(sdty.Dataset, list_ds[0])
schema = dataset.schema()
f_ks = dataset.foreign_keys()
table = pa.Table.from_batches(list(dataset.to_arrow()))
dataset_marginals = dataset.marginals()
assert dataset_marginals
statistics = dataset_marginals.statistics()
data_type = schema.data_type()
links_stats = []
for pointing_path, pointed_path in f_ks.items():
# # create common path with weights, protected_entity and
# foreign_key/primary_key
pointing_fk = schema.type().get(
Path(
sp.Path(label=dataset.name(), paths=[pointing_path.protobuf()])
)
)
pointing_data = column_data_as_pandas(pointing_fk, table)
pointing_stats = statistics.nodes_statistics(pointing_path)[0]
pointed_stats = statistics.nodes_statistics(pointed_path)[0]
# NOW COMPUTE COUNT INFO WITH POINTING DP PARAMS
data_col_name = pointing_path.to_strings_list()[0][-1]
if data_col_name == OPTIONAL_VALUE:
# the col name in the dataframe is not the last in the
# list but rather the second to last
data_col_name = pointing_path.to_strings_list()[0][-2]
# check if public
path_pointing_struct = path(
paths=data_type.get(pointing_path).structs()
)
path_pointed_struct = path(
paths=(data_type.get(pointed_path).structs())
)
if not get_is_public(
data_type.sub_types(path_pointing_struct)[0].properties()
) or not get_is_public(
data_type.sub_types(path_pointed_struct)[0].properties()
):
noise = links_dp_params.noise
else:
noise = 0
quantiles = private_links(
data=pointing_data,
col_name=data_col_name,
user_col=PU_COLUMN,
weight_col=WEIGHTS,
private_col=PUBLIC,
noise=noise,
mult_fk=pointing_stats.multiplicity(),
mult_pk=pointed_stats.multiplicity(),
size_fk=pointing_stats.size(),
size_pk=pointed_stats.size(),
nb_quantiles=links_dp_params.nb_quantiles,
isotonic_regression=links_dp_params.isotonic_regression,
iota=1e-3,
peid_only=True,
)
# set diracs
probabilities, values = update_quantiles(
quantiles=list(quantiles.values()),
probabilities=list(quantiles.keys()),
is_int=True,
)
distrib = sds.Integer_Distribution(
min_value=int(values[0]),
max_value=int(values[-1]),
probabilities=probabilities,
values=[int(q) for q in values],
)
links_stats.append(
link_statistics(
pointing=pointing_path,
pointed=pointed_path,
distribution=distrib,
)
)
return links(dataset=dataset, links_statistics=links_stats)