Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
from __future__ import annotations
import typing as t
from sarus_differential_privacy.query import (
ComposedQuery,
LaplaceQuery,
PrivateQuery,
)
from sarus_statistics.ops.bounds.local import automatic_column_range_pandas
from sarus_statistics.ops.utils import generator_from_seed
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc
from sarus_data_spec.constants import (
DATA,
OPTIONAL_VALUE,
PUBLIC,
TEXT_MAX_LENGTH,
TEXT_MIN_LENGTH,
PU_COLUMN,
WEIGHTS,
)
from sarus_data_spec.sarus_statistics.protobuf import bounds_pb2 as sbs
from sarus_data_spec.sarus_statistics.tasks.base import (
BaseStatisticsParameters,
StatisticsParameters,
get_is_public,
)
from sarus_data_spec.sarus_statistics.tasks.bounds.visitor_sql import (
get_bounds,
)
from sarus_data_spec.sarus_statistics.tasks.utils import (
ADMIN_COLS,
split_arrow_iterator_admin_data,
str_date_to_int,
str_datetime_to_int,
str_time_to_int,
)
import sarus_data_spec.protobuf as sp
import sarus_data_spec.statistics as sds
import sarus_data_spec.type as sdt
import sarus_data_spec.typing as st
FALSE_POSITIVE_PROB = 1.0 - 1e-9
class BoundsParameters(BaseStatisticsParameters):
"""A python class to describe statistics"""
def __init__(self, protobuf: sbs.BoundsParameters):
self._protobuf = protobuf
def protobuf(self) -> sbs.BoundsParameters:
return self._protobuf
def private_query(self) -> PrivateQuery:
query = self._private_query()
if query:
return query
return ComposedQuery([])
def _private_query(self) -> t.Optional[PrivateQuery]:
if self.type() == "none":
return None
elif self.type() == "minmax":
return LaplaceQuery(self.noise())
subqueries = []
for child in self.children().values():
subquery = child._private_query()
if subquery:
subqueries.append(subquery)
return ComposedQuery(subqueries)
def has_min(self) -> bool:
if self.type() == "minmax":
return self.protobuf().minmax.has_min
raise ValueError('Only Minmax has "has_min"')
def has_max(self) -> bool:
if self.type() == "minmax":
return self.protobuf().minmax.has_max
raise ValueError('Only Minmax has "has_max"')
def set_noise(self, noise: float) -> StatisticsParameters:
if self.type() == "minmax":
self.protobuf().minmax.noise = noise
elif self.type() == "composed":
for child in self.children().values():
child.set_noise(noise)
return self
def noise(self) -> float:
if self.type() == "minmax":
return self.protobuf().minmax.noise
raise ValueError('Only Minmax has "noise"')
def prototype(self) -> t.Type[sbs.BoundsParameters]:
"""Return the type of the underlying protobuf."""
return sbs.BoundsParameters
def children(self) -> t.Dict[str, BoundsParameters]:
if self.type() in ("none", "minmax"):
return {}
else:
return {
key: BoundsParameters(child)
for key, child in sorted(
getattr(self.protobuf(), self.type()).children.items()
)
}
def compute(self, dataset: st.Dataset) -> st.Statistics:
if dataset.manager().is_big_data(dataset):
return self._compute_big_data(dataset)
return self._compute_small_data(dataset)
def _compute_big_data(self, dataset: st.Dataset) -> st.Statistics:
return get_bounds(self, dataset)
def _compute_small_data(self, dataset: st.Dataset) -> st.Statistics:
rng = generator_from_seed(self.random_seed())
size = dataset.size()
data = split_arrow_iterator_admin_data(dataset.to_arrow())
assert size
base_type = dataset.schema().type().children()[DATA]
is_public = base_type.properties()[PUBLIC] == "True"
return _compute_visitor(
self,
dataset.schema().type().children()[DATA],
data[DATA],
data[ADMIN_COLS],
size.statistics(),
is_public,
rng,
mask=None,
)
def _compute_visitor(
bounds_object: BoundsParameters,
_type: st.Type,
values: pa.Array,
admin: pa.Array,
size: st.Statistics,
is_public: bool,
random_generator: np.random.Generator,
mask: t.Optional[pa.Array] = None,
) -> st.Statistics:
class BoundsComputation(st.TypeVisitor):
def __init__(
self,
bounds_params: BoundsParameters,
):
self.bounds_params = bounds_params
def _private_bounds(
self,
data: pd.DataFrame,
column_name: str,
type: st.Type,
estimate: t.Tuple[t.Optional[float], t.Optional[float]],
random_generator: t.Optional[np.random.Generator],
) -> t.Tuple[float, float]:
return automatic_column_range_pandas(
data=data,
data_col=column_name,
user_col=PU_COLUMN,
private_col=PUBLIC,
weight_col=WEIGHTS,
dtype=sdt.to_numeric_string(type),
noise=self.bounds_params.noise(),
prob_no_false_positive=FALSE_POSITIVE_PROB,
max_multiplicity=size.multiplicity(),
estimate=estimate,
random_generator=random_generator,
)
def _public_bounds(self, data: pd.Series) -> t.Tuple[float, float]:
return min(data), max(data)
def Null(
self, properties: t.Optional[t.Mapping[str, str]] = None
) -> None:
self.bounds = sds.Null(
size=size.size(),
multiplicity=size.multiplicity(),
)
def Unit(
self, properties: t.Optional[t.Mapping[str, str]] = None
) -> None:
self.bounds = sds.Unit(
size=size.size(),
multiplicity=size.multiplicity(),
)
def Boolean(
self, properties: t.Optional[t.Mapping[str, str]] = None
) -> None:
self.bounds = sds.Boolean(
size=size.size(),
multiplicity=size.multiplicity(),
)
def Id(
self,
unique: bool,
reference: t.Optional[st.Path] = None,
base: t.Optional[st.IdBase] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
self.bounds = sds.Id(
size=size.size(),
multiplicity=size.multiplicity(),
)
def Integer(
self,
min: int,
max: int,
base: st.IntegerBase,
possible_values: t.Iterable[int],
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
if len(t.cast(list, possible_values)) > 0:
self.bounds = sds.Integer(
min_value=int(np.min(list(possible_values))),
max_value=int(np.max(list(possible_values))),
size=size.size(),
multiplicity=size.multiplicity(),
)
else:
if is_public:
if mask is not None:
min_value, max_value = self._public_bounds(
values.filter(mask).to_pandas()
)
else:
min_value, max_value = self._public_bounds(
values.to_pandas()
)
else:
if mask is not None:
data_pd = (
pa.Table.from_arrays(
[*admin.flatten(), values],
names=[*admin.column_names, DATA],
)
.filter(mask)
.to_pandas()
)
else:
data_pd = pa.Table.from_arrays(
[*admin.flatten(), values],
names=[*admin.column_names, DATA],
).to_pandas()
data_pd[DATA] = data_pd[DATA].astype("int64")
min_value, max_value = self._private_bounds(
data_pd,
DATA,
_type,
(min, max),
random_generator,
)
self.bounds = sds.Integer(
size=size.size(),
multiplicity=size.multiplicity(),
min_value=int(min_value),
max_value=int(max_value),
)
def Enum(
self,
name: str,
name_values: t.Sequence[t.Tuple[str, int]],
ordered: bool,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
self.bounds = sds.Enum(
size=size.size(),
multiplicity=size.multiplicity(),
)
def Float(
self,
min: float,
max: float,
base: st.FloatBase,
possible_values: t.Iterable[float],
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
if len(t.cast(list, possible_values)) > 0:
self.bounds = sds.Float(
min_value=float(np.min(list(possible_values))),
max_value=float(np.max(list(possible_values))),
size=size.size(),
multiplicity=size.multiplicity(),
)
else:
if is_public:
if mask is not None:
min_value, max_value = self._public_bounds(
values.filter(mask).to_pandas(),
)
else:
min_value, max_value = self._public_bounds(
values.to_pandas(),
)
else:
if mask is not None:
data_pd = (
pa.Table.from_arrays(
[*admin.flatten(), values],
names=[*admin.column_names, DATA],
)
.filter(mask)
.to_pandas()
)
else:
data_pd = pa.Table.from_arrays(
[*admin.flatten(), values],
names=[*admin.column_names, DATA],
).to_pandas()
min_value, max_value = self._private_bounds(
data_pd,
DATA,
_type,
(min, max),
random_generator,
)
self.bounds = sds.Float(
size=size.size(),
multiplicity=size.multiplicity(),
min_value=min_value,
max_value=max_value,
)
def Text(
self,
encoding: str,
possible_values: t.Iterable[str],
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
if len(t.cast(list, possible_values)) > 0:
self.bounds = sds.Text(
min_value=np.min(
[len(value) for value in possible_values]
),
max_value=np.max(
[len(value) for value in possible_values]
),
size=size.size(),
multiplicity=size.multiplicity(),
)
else:
input_values = pa.compute.utf8_length(values)
if is_public:
if mask is not None:
min_value, max_value = self._public_bounds(
input_values.filter(mask).to_pandas(),
)
else:
min_value, max_value = self._public_bounds(
input_values.to_pandas(),
)
else:
if properties and properties[TEXT_MIN_LENGTH]:
min_estimate = int(properties[TEXT_MIN_LENGTH])
else:
min_estimate = 0
if properties and properties[TEXT_MAX_LENGTH]:
max_estimate = int(properties[TEXT_MAX_LENGTH])
else:
max_estimate = None
if mask is None:
data_pd = pa.Table.from_arrays(
[*admin.flatten(), input_values],
names=[*admin.column_names, DATA],
).to_pandas()
else:
data_pd = (
pa.Table.from_arrays(
[*admin.flatten(), input_values],
names=[*admin.column_names, DATA],
)
.filter(mask)
.to_pandas()
)
min_value, max_value = self._private_bounds(
data_pd,
DATA,
sdt.Type(sp.Type(integer=sp.Type.Integer())),
(min_estimate, max_estimate),
random_generator,
)
self.bounds = sds.Text(
size=size.size(),
multiplicity=size.multiplicity(),
min_value=int(min_value),
max_value=int(max_value),
)
def Bytes(
self, properties: t.Optional[t.Mapping[str, str]] = None
) -> None:
self.bounds = sds.Bytes(
size=size.size(),
multiplicity=size.multiplicity(),
)
def Struct(
self,
fields: t.Mapping[str, st.Type],
name: t.Optional[str] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
self.bounds = sds.Struct(
name=name,
fields={
key: _compute_visitor(
self.bounds_params.children()[key],
field,
values.flatten()[values.type.get_field_index(key)],
admin,
size.children()[key],
is_public=get_is_public(properties),
random_generator=random_generator,
mask=mask,
)
for key, field in fields.items()
},
size=size.size(),
multiplicity=size.multiplicity(),
)
def Union(
self,
fields: t.Mapping[str, st.Type],
name: t.Optional[str] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
assert name
new_fields: t.Dict[str, st.Statistics] = {}
field_selected_idx = values.type.get_field_index("field_selected")
exploded_list = values.flatten()
field_selected = exploded_list[field_selected_idx]
for field_name, field_type in fields.items():
filter_index = pa.compute.equal(field_selected, field_name)
if mask is None:
input_mask = filter_index
else:
input_mask = pc.and_(mask, filter_index)
new_fields[field_name] = _compute_visitor(
self.bounds_params.children()[field_name],
field_type,
exploded_list[values.type.get_field_index(field_name)],
admin,
size.children()[field_name],
field_type.properties()[PUBLIC] == "True",
random_generator=random_generator,
mask=input_mask,
)
self.bounds = sds.Union(
name=name,
fields=new_fields,
size=size.size(),
multiplicity=size.multiplicity(),
)
def Optional(
self,
type: st.Type,
name: t.Optional[str] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
assert name
valid = values.is_valid()
if mask is None:
input_mask = valid
else:
input_mask = pc.and_(mask, valid)
self.bounds = sds.Optional(
name=name,
statistics=_compute_visitor(
self.bounds_params.children()[OPTIONAL_VALUE],
type,
values,
admin,
size.children()[OPTIONAL_VALUE],
is_public,
random_generator=random_generator,
mask=input_mask,
),
size=size.size(),
multiplicity=size.multiplicity(),
)
def List(
self,
type: st.Type,
max_size: int,
name: t.Optional[str] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
raise NotImplementedError
def Array(
self,
type: st.Type,
shape: t.Tuple[int, ...],
name: t.Optional[str] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
raise NotImplementedError
def Datetime(
self,
format: str,
min: str,
max: str,
base: st.DatetimeBase,
possible_values: t.Iterable[str],
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
if len(t.cast(list, possible_values)) > 0:
unit = (
"ms"
if base.value == st.DatetimeBase.INT64_MS.value
else "ns"
)
string_arrow = pa.array(possible_values)
vals = pc.strptime(
pc.utf8_slice_codeunits(string_arrow, 0, 19),
format=format,
unit=unit,
)
if len(string_arrow.take([0]).to_pylist()[0]) > 19:
extra_ns = (
pc.utf8_slice_codeunits(
string_arrow, start=20, stop=30
)
.cast(pa.int64())
.cast(pa.duration("ns"))
)
vals = pc.add(vals, extra_ns)
self.bounds = sds.Datetime(
min_value=pc.min(vals).cast(pa.int64()).as_py(),
max_value=pc.max(vals).cast(pa.int64()).as_py(),
size=size.size(),
multiplicity=size.multiplicity(),
)
else:
if is_public:
if mask is not None:
pandas_public = values.filter(mask).to_pandas()
else:
pandas_public = values.to_pandas()
if base.value == sp.Type.Datetime.STRING:
values_public = pandas_public.astype(
"datetime64[ns]"
).astype(np.int64)
else:
values_public = pandas_public.astype(np.int64)
min_value, max_value = self._public_bounds(
values_public,
)
else:
estimate = (
str_datetime_to_int(min, format, base),
str_datetime_to_int(max, format, base),
)
if mask is not None:
merged = (
admin.append_column(DATA, values)
.filter(mask)
.to_pandas()
)
else:
merged = admin.append_column(DATA, values).to_pandas()
if base.value == sp.Type.Datetime.STRING:
merged[DATA] = (
merged[DATA]
.astype("datetime64[ns]")
.astype(np.int64)
)
else:
merged[DATA] = merged[DATA].astype(np.int64)
min_value, max_value = self._private_bounds(
merged,
DATA,
sdt.Type(sp.Type(integer=sp.Type.Integer())),
estimate,
random_generator,
)
self.bounds = sds.Datetime(
size=size.size(),
multiplicity=size.multiplicity(),
min_value=int(min_value),
max_value=int(max_value),
)
def Time(
self,
format: str,
min: str,
max: str,
base: st.TimeBase,
possible_values: t.Iterable[str],
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
if base == st.TimeBase.INT64_NS:
time_base = pa.time64("ns")
elif base == st.TimeBase.INT32_MS:
time_base = pa.time32("ms")
else:
time_base = pa.time64("us")
if len(t.cast(list, possible_values)) > 0:
vals = pa.array(
pd.to_datetime(list(possible_values)),
type=time_base,
)
self.bounds = sds.Time(
size=size.size(),
multiplicity=size.multiplicity(),
min_value=pc.min(vals).cast(pa.int64()).as_py(),
max_value=pc.max(vals).cast(pa.int64()).as_py(),
)
else:
# TODO: check that values are in the correct arrow timebase
# before conversion to int64
input_values = values.cast(pa.int64())
if is_public:
if mask is not None:
min_value, max_value = self._public_bounds(
values.filter(mask).to_pandas()
)
else:
min_value, max_value = self._public_bounds(
values.to_pandas()
)
else:
estimate = (
str_time_to_int(min, format, base),
str_time_to_int(max, format, base),
)
if mask is not None:
merged = (
admin.append_column(DATA, input_values)
.filter(mask)
.to_pandas()
)
else:
merged = admin.append_column(
DATA, input_values
).to_pandas()
min_value, max_value = self._private_bounds(
merged,
DATA,
sdt.Type(sp.Type(integer=sp.Type.Integer())),
estimate,
random_generator,
)
self.bounds = sds.Time(
size=size.size(),
multiplicity=size.multiplicity(),
min_value=int(min_value),
max_value=int(max_value),
)
def Date(
self,
format: str,
min: str,
max: str,
base: st.DateBase,
possible_values: t.Iterable[str],
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
if len(t.cast(list, possible_values)) > 0:
vals = pa.array(
pd.to_datetime(list(possible_values), format=format),
pa.date32(),
)
self.bounds = sds.Date(
min_value=pc.min(vals).cast(pa.int32()).as_py(),
max_value=pc.max(vals).cast(pa.int32()).as_py(),
size=size.size(),
multiplicity=size.multiplicity(),
)
else:
input_values = pa.compute.cast(values, pa.int32())
if is_public:
if mask is not None:
min_value, max_value = self._public_bounds(
input_values.filter(mask).to_pandas(),
)
else:
min_value, max_value = self._public_bounds(
input_values.to_pandas(),
)
else:
if mask is not None:
merged = (
admin.append_column(DATA, input_values)
.filter(mask)
.to_pandas()
)
else:
merged = admin.append_column(
DATA, input_values
).to_pandas()
estimate = (
str_date_to_int(min, format, base),
str_date_to_int(max, format, base),
)
min_value, max_value = self._private_bounds(
merged,
DATA,
sdt.Type(sp.Type(integer=sp.Type.Integer())),
estimate,
random_generator,
)
self.bounds = sds.Date(
size=size.size(),
multiplicity=size.multiplicity(),
min_value=int(min_value),
max_value=int(max_value),
)
def Duration(
self,
unit: str,
min: int,
max: int,
possible_values: t.Iterable[int],
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
if len(t.cast(list, possible_values)) > 0:
self.bounds = sds.Duration(
min_value=int(np.min(list(possible_values))),
max_value=int(np.max(list(possible_values))),
size=size.size(),
multiplicity=size.multiplicity(),
)
else:
input_values = values.cast(pa.int64())
if is_public:
if mask is not None:
min_value, max_value = self._public_bounds(
input_values.filter(mask).to_pandas(),
)
else:
min_value, max_value = self._public_bounds(
input_values.to_pandas(),
)
else:
if mask is not None:
merged = (
admin.append_column(DATA, input_values)
.filter(mask)
.to_pandas()
)
else:
merged = admin.append_column(
DATA, input_values
).to_pandas()
min_value, max_value = self._private_bounds(
merged,
DATA,
sdt.Type(sp.Type(integer=sp.Type.Integer())),
(min, max),
random_generator,
)
self.bounds = sds.Duration(
size=size.size(),
multiplicity=size.multiplicity(),
min_value=int(min_value),
max_value=int(max_value),
)
def Constrained(
self,
type: st.Type,
constraint: st.Predicate,
name: t.Optional[str] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
raise NotImplementedError
def Hypothesis(
self,
*types: t.Tuple[st.Type, float],
name: t.Optional[str] = None,
properties: t.Optional[t.Mapping[str, str]] = None,
) -> None:
raise NotImplementedError
visitor = BoundsComputation(bounds_object)
_type.accept(visitor)
return visitor.bounds