Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
from __future__ import annotations

import typing as t

from sarus_differential_privacy.query import (
    ComposedQuery,
    LaplaceQuery,
    PrivateQuery,
)
from sarus_statistics.ops.bounds.local import automatic_column_range_pandas
from sarus_statistics.ops.utils import generator_from_seed
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc

from sarus_data_spec.constants import (
    DATA,
    OPTIONAL_VALUE,
    PUBLIC,
    TEXT_MAX_LENGTH,
    TEXT_MIN_LENGTH,
    PU_COLUMN,
    WEIGHTS,
)
from sarus_data_spec.sarus_statistics.protobuf import bounds_pb2 as sbs
from sarus_data_spec.sarus_statistics.tasks.base import (
    BaseStatisticsParameters,
    StatisticsParameters,
    get_is_public,
)
from sarus_data_spec.sarus_statistics.tasks.bounds.visitor_sql import (
    get_bounds,
)
from sarus_data_spec.sarus_statistics.tasks.utils import (
    ADMIN_COLS,
    split_arrow_iterator_admin_data,
    str_date_to_int,
    str_datetime_to_int,
    str_time_to_int,
)
import sarus_data_spec.protobuf as sp
import sarus_data_spec.statistics as sds
import sarus_data_spec.type as sdt
import sarus_data_spec.typing as st

FALSE_POSITIVE_PROB = 1.0 - 1e-9


class BoundsParameters(BaseStatisticsParameters):
    """A python class to describe statistics"""

    def __init__(self, protobuf: sbs.BoundsParameters):
        self._protobuf = protobuf

    def protobuf(self) -> sbs.BoundsParameters:
        return self._protobuf

    def private_query(self) -> PrivateQuery:
        query = self._private_query()
        if query:
            return query
        return ComposedQuery([])

    def _private_query(self) -> t.Optional[PrivateQuery]:
        if self.type() == "none":
            return None
        elif self.type() == "minmax":
            return LaplaceQuery(self.noise())

        subqueries = []
        for child in self.children().values():
            subquery = child._private_query()
            if subquery:
                subqueries.append(subquery)
        return ComposedQuery(subqueries)

    def has_min(self) -> bool:
        if self.type() == "minmax":
            return self.protobuf().minmax.has_min
        raise ValueError('Only Minmax has "has_min"')

    def has_max(self) -> bool:
        if self.type() == "minmax":
            return self.protobuf().minmax.has_max
        raise ValueError('Only Minmax has "has_max"')

    def set_noise(self, noise: float) -> StatisticsParameters:
        if self.type() == "minmax":
            self.protobuf().minmax.noise = noise
        elif self.type() == "composed":
            for child in self.children().values():
                child.set_noise(noise)
        return self

    def noise(self) -> float:
        if self.type() == "minmax":
            return self.protobuf().minmax.noise
        raise ValueError('Only Minmax has "noise"')

    def prototype(self) -> t.Type[sbs.BoundsParameters]:
        """Return the type of the underlying protobuf."""
        return sbs.BoundsParameters

    def children(self) -> t.Dict[str, BoundsParameters]:
        if self.type() in ("none", "minmax"):
            return {}
        else:
            return {
                key: BoundsParameters(child)
                for key, child in sorted(
                    getattr(self.protobuf(), self.type()).children.items()
                )
            }

    def compute(self, dataset: st.Dataset) -> st.Statistics:
        if dataset.manager().is_big_data(dataset):
            return self._compute_big_data(dataset)
        return self._compute_small_data(dataset)

    def _compute_big_data(self, dataset: st.Dataset) -> st.Statistics:
        return get_bounds(self, dataset)

    def _compute_small_data(self, dataset: st.Dataset) -> st.Statistics:
        rng = generator_from_seed(self.random_seed())
        size = dataset.size()
        data = split_arrow_iterator_admin_data(dataset.to_arrow())
        assert size
        base_type = dataset.schema().type().children()[DATA]
        is_public = base_type.properties()[PUBLIC] == "True"
        return _compute_visitor(
            self,
            dataset.schema().type().children()[DATA],
            data[DATA],
            data[ADMIN_COLS],
            size.statistics(),
            is_public,
            rng,
            mask=None,
        )


def _compute_visitor(
    bounds_object: BoundsParameters,
    _type: st.Type,
    values: pa.Array,
    admin: pa.Array,
    size: st.Statistics,
    is_public: bool,
    random_generator: np.random.Generator,
    mask: t.Optional[pa.Array] = None,
) -> st.Statistics:
    class BoundsComputation(st.TypeVisitor):
        def __init__(
            self,
            bounds_params: BoundsParameters,
        ):
            self.bounds_params = bounds_params

        def _private_bounds(
            self,
            data: pd.DataFrame,
            column_name: str,
            type: st.Type,
            estimate: t.Tuple[t.Optional[float], t.Optional[float]],
            random_generator: t.Optional[np.random.Generator],
        ) -> t.Tuple[float, float]:
            return automatic_column_range_pandas(
                data=data,
                data_col=column_name,
                user_col=PU_COLUMN,
                private_col=PUBLIC,
                weight_col=WEIGHTS,
                dtype=sdt.to_numeric_string(type),
                noise=self.bounds_params.noise(),
                prob_no_false_positive=FALSE_POSITIVE_PROB,
                max_multiplicity=size.multiplicity(),
                estimate=estimate,
                random_generator=random_generator,
            )

        def _public_bounds(self, data: pd.Series) -> t.Tuple[float, float]:
            return min(data), max(data)

        def Null(
            self, properties: t.Optional[t.Mapping[str, str]] = None
        ) -> None:
            self.bounds = sds.Null(
                size=size.size(),
                multiplicity=size.multiplicity(),
            )

        def Unit(
            self, properties: t.Optional[t.Mapping[str, str]] = None
        ) -> None:
            self.bounds = sds.Unit(
                size=size.size(),
                multiplicity=size.multiplicity(),
            )

        def Boolean(
            self, properties: t.Optional[t.Mapping[str, str]] = None
        ) -> None:
            self.bounds = sds.Boolean(
                size=size.size(),
                multiplicity=size.multiplicity(),
            )

        def Id(
            self,
            unique: bool,
            reference: t.Optional[st.Path] = None,
            base: t.Optional[st.IdBase] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            self.bounds = sds.Id(
                size=size.size(),
                multiplicity=size.multiplicity(),
            )

        def Integer(
            self,
            min: int,
            max: int,
            base: st.IntegerBase,
            possible_values: t.Iterable[int],
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            if len(t.cast(list, possible_values)) > 0:
                self.bounds = sds.Integer(
                    min_value=int(np.min(list(possible_values))),
                    max_value=int(np.max(list(possible_values))),
                    size=size.size(),
                    multiplicity=size.multiplicity(),
                )

            else:
                if is_public:
                    if mask is not None:
                        min_value, max_value = self._public_bounds(
                            values.filter(mask).to_pandas()
                        )
                    else:
                        min_value, max_value = self._public_bounds(
                            values.to_pandas()
                        )
                else:
                    if mask is not None:
                        data_pd = (
                            pa.Table.from_arrays(
                                [*admin.flatten(), values],
                                names=[*admin.column_names, DATA],
                            )
                            .filter(mask)
                            .to_pandas()
                        )
                    else:
                        data_pd = pa.Table.from_arrays(
                            [*admin.flatten(), values],
                            names=[*admin.column_names, DATA],
                        ).to_pandas()
                    data_pd[DATA] = data_pd[DATA].astype("int64")
                    min_value, max_value = self._private_bounds(
                        data_pd,
                        DATA,
                        _type,
                        (min, max),
                        random_generator,
                    )

                self.bounds = sds.Integer(
                    size=size.size(),
                    multiplicity=size.multiplicity(),
                    min_value=int(min_value),
                    max_value=int(max_value),
                )

        def Enum(
            self,
            name: str,
            name_values: t.Sequence[t.Tuple[str, int]],
            ordered: bool,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            self.bounds = sds.Enum(
                size=size.size(),
                multiplicity=size.multiplicity(),
            )

        def Float(
            self,
            min: float,
            max: float,
            base: st.FloatBase,
            possible_values: t.Iterable[float],
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            if len(t.cast(list, possible_values)) > 0:
                self.bounds = sds.Float(
                    min_value=float(np.min(list(possible_values))),
                    max_value=float(np.max(list(possible_values))),
                    size=size.size(),
                    multiplicity=size.multiplicity(),
                )

            else:
                if is_public:
                    if mask is not None:
                        min_value, max_value = self._public_bounds(
                            values.filter(mask).to_pandas(),
                        )
                    else:
                        min_value, max_value = self._public_bounds(
                            values.to_pandas(),
                        )
                else:
                    if mask is not None:
                        data_pd = (
                            pa.Table.from_arrays(
                                [*admin.flatten(), values],
                                names=[*admin.column_names, DATA],
                            )
                            .filter(mask)
                            .to_pandas()
                        )
                    else:
                        data_pd = pa.Table.from_arrays(
                            [*admin.flatten(), values],
                            names=[*admin.column_names, DATA],
                        ).to_pandas()

                    min_value, max_value = self._private_bounds(
                        data_pd,
                        DATA,
                        _type,
                        (min, max),
                        random_generator,
                    )

                self.bounds = sds.Float(
                    size=size.size(),
                    multiplicity=size.multiplicity(),
                    min_value=min_value,
                    max_value=max_value,
                )

        def Text(
            self,
            encoding: str,
            possible_values: t.Iterable[str],
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            if len(t.cast(list, possible_values)) > 0:
                self.bounds = sds.Text(
                    min_value=np.min(
                        [len(value) for value in possible_values]
                    ),
                    max_value=np.max(
                        [len(value) for value in possible_values]
                    ),
                    size=size.size(),
                    multiplicity=size.multiplicity(),
                )

            else:
                input_values = pa.compute.utf8_length(values)
                if is_public:
                    if mask is not None:
                        min_value, max_value = self._public_bounds(
                            input_values.filter(mask).to_pandas(),
                        )
                    else:
                        min_value, max_value = self._public_bounds(
                            input_values.to_pandas(),
                        )
                else:
                    if properties and properties[TEXT_MIN_LENGTH]:
                        min_estimate = int(properties[TEXT_MIN_LENGTH])
                    else:
                        min_estimate = 0

                    if properties and properties[TEXT_MAX_LENGTH]:
                        max_estimate = int(properties[TEXT_MAX_LENGTH])
                    else:
                        max_estimate = None
                    if mask is None:
                        data_pd = pa.Table.from_arrays(
                            [*admin.flatten(), input_values],
                            names=[*admin.column_names, DATA],
                        ).to_pandas()
                    else:
                        data_pd = (
                            pa.Table.from_arrays(
                                [*admin.flatten(), input_values],
                                names=[*admin.column_names, DATA],
                            )
                            .filter(mask)
                            .to_pandas()
                        )
                    min_value, max_value = self._private_bounds(
                        data_pd,
                        DATA,
                        sdt.Type(sp.Type(integer=sp.Type.Integer())),
                        (min_estimate, max_estimate),
                        random_generator,
                    )

                self.bounds = sds.Text(
                    size=size.size(),
                    multiplicity=size.multiplicity(),
                    min_value=int(min_value),
                    max_value=int(max_value),
                )

        def Bytes(
            self, properties: t.Optional[t.Mapping[str, str]] = None
        ) -> None:
            self.bounds = sds.Bytes(
                size=size.size(),
                multiplicity=size.multiplicity(),
            )

        def Struct(
            self,
            fields: t.Mapping[str, st.Type],
            name: t.Optional[str] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            self.bounds = sds.Struct(
                name=name,
                fields={
                    key: _compute_visitor(
                        self.bounds_params.children()[key],
                        field,
                        values.flatten()[values.type.get_field_index(key)],
                        admin,
                        size.children()[key],
                        is_public=get_is_public(properties),
                        random_generator=random_generator,
                        mask=mask,
                    )
                    for key, field in fields.items()
                },
                size=size.size(),
                multiplicity=size.multiplicity(),
            )

        def Union(
            self,
            fields: t.Mapping[str, st.Type],
            name: t.Optional[str] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            assert name
            new_fields: t.Dict[str, st.Statistics] = {}
            field_selected_idx = values.type.get_field_index("field_selected")
            exploded_list = values.flatten()
            field_selected = exploded_list[field_selected_idx]
            for field_name, field_type in fields.items():
                filter_index = pa.compute.equal(field_selected, field_name)
                if mask is None:
                    input_mask = filter_index
                else:
                    input_mask = pc.and_(mask, filter_index)
                new_fields[field_name] = _compute_visitor(
                    self.bounds_params.children()[field_name],
                    field_type,
                    exploded_list[values.type.get_field_index(field_name)],
                    admin,
                    size.children()[field_name],
                    field_type.properties()[PUBLIC] == "True",
                    random_generator=random_generator,
                    mask=input_mask,
                )
            self.bounds = sds.Union(
                name=name,
                fields=new_fields,
                size=size.size(),
                multiplicity=size.multiplicity(),
            )

        def Optional(
            self,
            type: st.Type,
            name: t.Optional[str] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            assert name
            valid = values.is_valid()
            if mask is None:
                input_mask = valid
            else:
                input_mask = pc.and_(mask, valid)
            self.bounds = sds.Optional(
                name=name,
                statistics=_compute_visitor(
                    self.bounds_params.children()[OPTIONAL_VALUE],
                    type,
                    values,
                    admin,
                    size.children()[OPTIONAL_VALUE],
                    is_public,
                    random_generator=random_generator,
                    mask=input_mask,
                ),
                size=size.size(),
                multiplicity=size.multiplicity(),
            )

        def List(
            self,
            type: st.Type,
            max_size: int,
            name: t.Optional[str] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            raise NotImplementedError

        def Array(
            self,
            type: st.Type,
            shape: t.Tuple[int, ...],
            name: t.Optional[str] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            raise NotImplementedError

        def Datetime(
            self,
            format: str,
            min: str,
            max: str,
            base: st.DatetimeBase,
            possible_values: t.Iterable[str],
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            if len(t.cast(list, possible_values)) > 0:
                unit = (
                    "ms"
                    if base.value == st.DatetimeBase.INT64_MS.value
                    else "ns"
                )
                string_arrow = pa.array(possible_values)
                vals = pc.strptime(
                    pc.utf8_slice_codeunits(string_arrow, 0, 19),
                    format=format,
                    unit=unit,
                )
                if len(string_arrow.take([0]).to_pylist()[0]) > 19:
                    extra_ns = (
                        pc.utf8_slice_codeunits(
                            string_arrow, start=20, stop=30
                        )
                        .cast(pa.int64())
                        .cast(pa.duration("ns"))
                    )
                    vals = pc.add(vals, extra_ns)
                self.bounds = sds.Datetime(
                    min_value=pc.min(vals).cast(pa.int64()).as_py(),
                    max_value=pc.max(vals).cast(pa.int64()).as_py(),
                    size=size.size(),
                    multiplicity=size.multiplicity(),
                )

            else:
                if is_public:
                    if mask is not None:
                        pandas_public = values.filter(mask).to_pandas()
                    else:
                        pandas_public = values.to_pandas()
                    if base.value == sp.Type.Datetime.STRING:
                        values_public = pandas_public.astype(
                            "datetime64[ns]"
                        ).astype(np.int64)
                    else:
                        values_public = pandas_public.astype(np.int64)
                    min_value, max_value = self._public_bounds(
                        values_public,
                    )
                else:
                    estimate = (
                        str_datetime_to_int(min, format, base),
                        str_datetime_to_int(max, format, base),
                    )
                    if mask is not None:
                        merged = (
                            admin.append_column(DATA, values)
                            .filter(mask)
                            .to_pandas()
                        )
                    else:
                        merged = admin.append_column(DATA, values).to_pandas()
                    if base.value == sp.Type.Datetime.STRING:
                        merged[DATA] = (
                            merged[DATA]
                            .astype("datetime64[ns]")
                            .astype(np.int64)
                        )
                    else:
                        merged[DATA] = merged[DATA].astype(np.int64)
                    min_value, max_value = self._private_bounds(
                        merged,
                        DATA,
                        sdt.Type(sp.Type(integer=sp.Type.Integer())),
                        estimate,
                        random_generator,
                    )
                self.bounds = sds.Datetime(
                    size=size.size(),
                    multiplicity=size.multiplicity(),
                    min_value=int(min_value),
                    max_value=int(max_value),
                )

        def Time(
            self,
            format: str,
            min: str,
            max: str,
            base: st.TimeBase,
            possible_values: t.Iterable[str],
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            if base == st.TimeBase.INT64_NS:
                time_base = pa.time64("ns")
            elif base == st.TimeBase.INT32_MS:
                time_base = pa.time32("ms")
            else:
                time_base = pa.time64("us")

            if len(t.cast(list, possible_values)) > 0:
                vals = pa.array(
                    pd.to_datetime(list(possible_values)),
                    type=time_base,
                )

                self.bounds = sds.Time(
                    size=size.size(),
                    multiplicity=size.multiplicity(),
                    min_value=pc.min(vals).cast(pa.int64()).as_py(),
                    max_value=pc.max(vals).cast(pa.int64()).as_py(),
                )

            else:
                # TODO: check that values are in the correct arrow timebase
                # before conversion to int64
                input_values = values.cast(pa.int64())

                if is_public:
                    if mask is not None:
                        min_value, max_value = self._public_bounds(
                            values.filter(mask).to_pandas()
                        )
                    else:
                        min_value, max_value = self._public_bounds(
                            values.to_pandas()
                        )
                else:
                    estimate = (
                        str_time_to_int(min, format, base),
                        str_time_to_int(max, format, base),
                    )
                    if mask is not None:
                        merged = (
                            admin.append_column(DATA, input_values)
                            .filter(mask)
                            .to_pandas()
                        )
                    else:
                        merged = admin.append_column(
                            DATA, input_values
                        ).to_pandas()
                    min_value, max_value = self._private_bounds(
                        merged,
                        DATA,
                        sdt.Type(sp.Type(integer=sp.Type.Integer())),
                        estimate,
                        random_generator,
                    )
                self.bounds = sds.Time(
                    size=size.size(),
                    multiplicity=size.multiplicity(),
                    min_value=int(min_value),
                    max_value=int(max_value),
                )

        def Date(
            self,
            format: str,
            min: str,
            max: str,
            base: st.DateBase,
            possible_values: t.Iterable[str],
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            if len(t.cast(list, possible_values)) > 0:
                vals = pa.array(
                    pd.to_datetime(list(possible_values), format=format),
                    pa.date32(),
                )
                self.bounds = sds.Date(
                    min_value=pc.min(vals).cast(pa.int32()).as_py(),
                    max_value=pc.max(vals).cast(pa.int32()).as_py(),
                    size=size.size(),
                    multiplicity=size.multiplicity(),
                )

            else:
                input_values = pa.compute.cast(values, pa.int32())
                if is_public:
                    if mask is not None:
                        min_value, max_value = self._public_bounds(
                            input_values.filter(mask).to_pandas(),
                        )
                    else:
                        min_value, max_value = self._public_bounds(
                            input_values.to_pandas(),
                        )
                else:
                    if mask is not None:
                        merged = (
                            admin.append_column(DATA, input_values)
                            .filter(mask)
                            .to_pandas()
                        )
                    else:
                        merged = admin.append_column(
                            DATA, input_values
                        ).to_pandas()
                    estimate = (
                        str_date_to_int(min, format, base),
                        str_date_to_int(max, format, base),
                    )
                    min_value, max_value = self._private_bounds(
                        merged,
                        DATA,
                        sdt.Type(sp.Type(integer=sp.Type.Integer())),
                        estimate,
                        random_generator,
                    )

                self.bounds = sds.Date(
                    size=size.size(),
                    multiplicity=size.multiplicity(),
                    min_value=int(min_value),
                    max_value=int(max_value),
                )

        def Duration(
            self,
            unit: str,
            min: int,
            max: int,
            possible_values: t.Iterable[int],
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            if len(t.cast(list, possible_values)) > 0:
                self.bounds = sds.Duration(
                    min_value=int(np.min(list(possible_values))),
                    max_value=int(np.max(list(possible_values))),
                    size=size.size(),
                    multiplicity=size.multiplicity(),
                )

            else:
                input_values = values.cast(pa.int64())

                if is_public:
                    if mask is not None:
                        min_value, max_value = self._public_bounds(
                            input_values.filter(mask).to_pandas(),
                        )
                    else:
                        min_value, max_value = self._public_bounds(
                            input_values.to_pandas(),
                        )
                else:
                    if mask is not None:
                        merged = (
                            admin.append_column(DATA, input_values)
                            .filter(mask)
                            .to_pandas()
                        )
                    else:
                        merged = admin.append_column(
                            DATA, input_values
                        ).to_pandas()
                    min_value, max_value = self._private_bounds(
                        merged,
                        DATA,
                        sdt.Type(sp.Type(integer=sp.Type.Integer())),
                        (min, max),
                        random_generator,
                    )

                self.bounds = sds.Duration(
                    size=size.size(),
                    multiplicity=size.multiplicity(),
                    min_value=int(min_value),
                    max_value=int(max_value),
                )

        def Constrained(
            self,
            type: st.Type,
            constraint: st.Predicate,
            name: t.Optional[str] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            raise NotImplementedError

        def Hypothesis(
            self,
            *types: t.Tuple[st.Type, float],
            name: t.Optional[str] = None,
            properties: t.Optional[t.Mapping[str, str]] = None,
        ) -> None:
            raise NotImplementedError

    visitor = BoundsComputation(bounds_object)
    _type.accept(visitor)
    return visitor.bounds