Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
# pylint: disable=redefined-builtin, too-many-lines, too-few-public-methods

from __future__ import annotations

import enum
import math
from typing import Iterable, List, Optional, Sequence, Tuple, Union, cast
from uuid import UUID, uuid4

from sqlalchemy import Enum, ForeignKey, Integer, String
from sqlalchemy.orm import Query, backref, mapped_column, relationship
from sqlalchemy.orm.session import Session

from sarus_differential_privacy import analysis
from sarus_differential_privacy.protobuf.private_query_pb2 import (
    PrivateQuery as ProtoPrivateQuery,
)
from sarus_differential_privacy.sample import Sample
from sarus_differential_privacy.utils import Base, Json


# pylint: disable=invalid-name
class PrivateQueryStatus(enum.Enum):
    """Enum of Private queries' statuses
    Queries are inactive before being registered and are executed then."""

    inactive = 0
    executed = 1
    waived = 2


# pylint: disable=invalid-name
class PrivateQueryType(enum.Enum):
    """Enum of Private queries' types."""

    base_private_query = 0
    composed_query = 1
    sampled_query = 2
    gaussian_query = 3
    laplace_query = 4
    randomized_response_query = 5
    epsilon_query = 6
    sgm_query = 7
    epsilon_delta_query = 8
    multiple_query = 9


class SQLPrivateQuery(Base):
    """SQL equivalent of BasePrivateQuery."""

    __tablename__ = "query"
    __table_args__ = {"extend_existing": True}

    id = mapped_column(Integer, primary_key=True)
    uuid = mapped_column(String, unique=True)
    type = mapped_column(Enum(PrivateQueryType))
    status = mapped_column(Enum(PrivateQueryStatus))
    parameters = mapped_column(Json(128))
    rdp_id = relationship("RDP")
    supquery_id = mapped_column(Integer, ForeignKey(id, onupdate="cascade"))
    subqueries = relationship(
        "SQLPrivateQuery",
        cascade="all, delete-orphan",
        backref=backref("supquery", remote_side=id),
        collection_class=list,
        enable_typechecks=False,
    )
    sample = relationship("SQLSample")
    sample_id = mapped_column(Integer, ForeignKey("sample.id"))

    __mapper_args__ = {
        'polymorphic_on': type,
    }

    def private_query(self) -> BasePrivateQuery:
        """Return the corresponding BasePrivateQuery object"""

        if self.type == PrivateQueryType.composed_query:
            query: BasePrivateQuery = ComposedQuery(
                [q.private_query() for q in self.subqueries]
            )
        if self.type == PrivateQueryType.sampled_query:
            query = SampledQuery(
                list(self.subqueries)[0].private_query(),
                self.sample.sample(),
            )
        if self.type == PrivateQueryType.gaussian_query:
            query = GaussianQuery(  # pylint: disable=redefined-variable-type
                **self.parameters
            )
        if self.type == PrivateQueryType.laplace_query:
            query = LaplaceQuery(**self.parameters)
        if self.type == PrivateQueryType.randomized_response_query:
            query = RandomizedResponseQuery(**self.parameters)
        if self.type == PrivateQueryType.epsilon_query:
            query = EpsilonQuery(**self.parameters)
        if self.type == PrivateQueryType.sgm_query:
            query = SampledGaussianMechanismQuery(**self.parameters)
        if self.type == PrivateQueryType.epsilon_delta_query:
            query = EpsilonDeltaQuery(**self.parameters)
        if self.type == PrivateQueryType.multiple_query:
            query = MultipleQuery(
                list(self.subqueries)[0].private_query(), **self.parameters
            )
        query._uuid = UUID(self.uuid)  # pylint: disable=protected-access
        query.status = self.status
        return query


# pylint: disable=unnecessary-pass
class PrivateQuery:
    """Abstract PrivateQuery object"""

    @property
    def type(self) -> PrivateQueryType:
        """Type of query: gaussian, laplace, composed..."""
        pass

    @property
    def status(self) -> PrivateQueryStatus:
        """Status of the query: inactive, executed or waived"""
        pass

    @status.setter
    def status(self, value: PrivateQueryStatus) -> None:
        pass

    @property
    def parameters(self) -> Optional[dict]:
        """Parameters of the query. None for sampled and composed query"""
        pass

    @property
    def uuid(self) -> UUID:
        """Unique identifier of the query"""
        pass

    @property
    def depth(self) -> int:
        """Number of supqueries between self and root node"""
        pass

    @depth.setter
    def depth(self, value: int) -> None:
        pass

    def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
        """Returns the SQLPrivateQuery equivalent of the PrivateQuery"""
        pass

    def protobuf(self) -> ProtoPrivateQuery:
        """Returns the ProtoPrivateQuery equivalent of the PrivateQuery"""
        pass

    def rdp(
        self, alpha: Union[float, List[float]]
    ) -> Union[float, List[float]]:
        """Function implementing the RDP consumption of the query for a given
        alpha or a list of alphas"""
        pass

    def epsilon_delta(self) -> Tuple[float, float]:
        """Returns non-rdp (epsilon, delta) pair.
        By default, it is (0, 0). To be overwritten"""
        pass

    def pure_delta(self) -> float:
        """Returns the delta part of an EpsilonDeltaQuery"""
        pass


class ComposedPrivateQuery(PrivateQuery):
    """Abstract composed private query"""

    @property
    def subqueries(self) -> Sequence[PrivateQuery]:
        """Subqueries for composed, sampled and multiple queries"""
        pass

    def all_subqueries(self) -> Sequence[PrivateQuery]:
        """Get all descendants of self including itself in a set"""
        pass


class SampledPrivateQuery(ComposedPrivateQuery):
    """Abstract sampled private query"""

    @property
    def sample(self) -> Sample:
        """Subqueries for composed, sampled and multiple queries"""
        pass


class BasePrivateQuery(PrivateQuery):
    """Base class for private queries.
    Implements the static methods"""

    def __init__(self) -> None:
        self._type = PrivateQueryType.base_private_query
        self._status = PrivateQueryStatus.inactive
        self._parameters: Optional[dict] = None
        self._uuid = uuid4()
        self.depth = 0
        self._rdp: dict[float, float] = {}

    @property
    def type(self) -> PrivateQueryType:
        return self._type

    @property
    def status(self) -> PrivateQueryStatus:
        return self._status

    @status.setter
    def status(self, value: PrivateQueryStatus) -> None:
        self._status = value

    @property
    def parameters(self) -> Optional[dict]:
        return self._parameters

    @property
    def uuid(self) -> UUID:
        return self._uuid

    @property
    def depth(self) -> int:
        return self._depth

    @depth.setter
    def depth(self, value: int) -> None:
        self._depth = value

    def epsilon_delta(self) -> Tuple[float, float]:
        """Returns non-rdp (epsilon, delta) pair.
        By default, it is (0, 0). To be overwritten"""
        return (0, 0)

    def pure_delta(self) -> float:
        return 0

    @staticmethod
    def from_sql(sql_object: SQLPrivateQuery) -> PrivateQuery:
        """Build PrivateQuery from SQLPrivateQuery object"""
        return sql_object.private_query()

    # pylint: disable=too-many-branches
    @staticmethod
    def from_protobuf(
        protobuf: Sequence[ProtoPrivateQuery],
    ) -> List['BasePrivateQuery']:
        """Build a list of PrivateQuery from a sequence of ProtoPrivateQuery
        objects"""

        queries = {}
        for q_proto in protobuf:
            if q_proto.type == PrivateQueryType.composed_query.name:
                query: BasePrivateQuery = ComposedQuery([])
            elif q_proto.type == PrivateQueryType.sampled_query.name:
                query = SampledQuery(
                    None, Sample.from_protobuf(q_proto.sample)
                )
            elif q_proto.type == PrivateQueryType.gaussian_query.name:
                query = GaussianQuery(q_proto.parameters['noise'])
            elif q_proto.type == PrivateQueryType.laplace_query.name:
                query = LaplaceQuery(q_proto.parameters['noise'])
            elif (
                q_proto.type == PrivateQueryType.randomized_response_query.name
            ):
                query = RandomizedResponseQuery(
                    q_proto.parameters['probability']
                )
            elif q_proto.type == PrivateQueryType.epsilon_query.name:
                query = EpsilonQuery(q_proto.parameters['epsilon'])
            elif q_proto.type == PrivateQueryType.sgm_query.name:
                query = SampledGaussianMechanismQuery(
                    q_proto.parameters['sampling_probability'],
                    q_proto.parameters['noise_multiplier'],
                    int(q_proto.parameters['steps']),
                )
            elif q_proto.type == PrivateQueryType.epsilon_delta_query.name:
                query = EpsilonDeltaQuery(
                    q_proto.parameters['epsilon'], q_proto.parameters['delta']
                )
            elif q_proto.type == PrivateQueryType.multiple_query.name:
                query = MultipleQuery(
                    None, int(q_proto.parameters['multiplicity'])
                )
            else:
                raise ValueError(f'type not recognized: {q_proto.type}')

            query._uuid = UUID(  # pylint: disable=protected-access
                q_proto.uuid
            )
            query.status = PrivateQueryStatus[q_proto.status]
            queries[q_proto.uuid] = (query, q_proto)

        for q, q_proto in queries.values():
            if q_proto.subqueries:
                # pylint: disable=protected-access
                q._subqueries += [  # type: ignore[attr-defined]
                    queries[subq][0] for subq in q_proto.subqueries
                ]
                subqueries = q.all_subqueries()  # type: ignore[attr-defined]
                subqueries.remove(q)
                for subq in subqueries:
                    subq.depth += 1

        return [q[0] for q in queries.values()]

    @staticmethod
    def to_protobuf(
        queries: Iterable[PrivateQuery],
    ) -> List[ProtoPrivateQuery]:
        """Convert a sequence of queries to a list of ProtoPrivateQuery
        If composed or sampled queries, all descendants must be in queries"""
        return [q.protobuf() for q in queries]


class BaseComposedPrivateQuery(BasePrivateQuery, ComposedPrivateQuery):
    """Base class for ComposedPrivateQuery"""

    def __init__(self) -> None:
        super().__init__()
        self._subqueries: List[PrivateQuery] = []

    @property
    def subqueries(self) -> List[PrivateQuery]:
        return self._subqueries

    def all_subqueries(self) -> List[PrivateQuery]:
        descendants: List[PrivateQuery] = [self]
        for q in self.subqueries:
            if isinstance(q, ComposedPrivateQuery):
                descendants += q.all_subqueries()
            else:
                descendants.append(q)
        return descendants


def remove_duplicates_preserve_order(lst: List) -> List:
    """equivalent of list(set()) but preserves order
    given in the input list"""
    seen = set()
    new_list = []
    for el in lst:
        if el not in seen:
            new_list.append(el)
            seen.add(el)
    return new_list


class ComposedQuery(BaseComposedPrivateQuery):
    """A sequence of private queries"""

    def __init__(
        self,
        queries: List[PrivateQuery],
    ) -> None:
        super().__init__()
        self._type = PrivateQueryType.composed_query
        for q in queries:
            if isinstance(q, ComposedPrivateQuery):
                for subq in q.all_subqueries():
                    subq.depth += 1
            else:
                q.depth += 1
            self._subqueries.append(q)
        self._subqueries = remove_duplicates_preserve_order(self._subqueries)

    def rdp(
        self, alpha: Union[float, List[float]]
    ) -> Union[float, List[float]]:
        if not isinstance(alpha, list):
            if alpha in self._rdp:
                return self._rdp[alpha]

            rdp_alpha = sum(
                [cast(float, q.rdp(alpha)) for q in self.subqueries]
            )
            self._rdp[alpha] = rdp_alpha
            return rdp_alpha

        return [cast(float, self.rdp(a)) for a in alpha]

    def epsilon_delta(self) -> Tuple[float, float]:
        list_epsilon_delta = [q.epsilon_delta() for q in self.subqueries]
        epsilon = sum(
            [q_epsilon_delta[0] for q_epsilon_delta in list_epsilon_delta]
        )
        delta = sum(
            [q_epsilon_delta[1] for q_epsilon_delta in list_epsilon_delta]
        )
        return (epsilon, delta)

    def pure_delta(self) -> float:
        return sum([q.pure_delta() for q in self.subqueries])

    def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
        if session:
            existing_query = cast(
                Query,
                session.query(SQLPrivateQuery).where(
                    SQLPrivateQuery.uuid == str(self.uuid)
                ),
            ).one_or_none()
            if existing_query:
                return cast(SQLPrivateQuery, existing_query)

        sql_query = SQLPrivateQuery(
            type=PrivateQueryType.composed_query,
            uuid=str(self.uuid),
            status=self.status,
            subqueries=[q.sql(session) for q in self.subqueries],
        )
        if session:
            session.add(sql_query)
        return sql_query

    def protobuf(self) -> ProtoPrivateQuery:
        return ProtoPrivateQuery(
            uuid=str(self.uuid),
            type=self.type.name,
            status=self.status.name,
            subqueries=[str(subq.uuid) for subq in self.subqueries],
        )


class SampledQuery(BaseComposedPrivateQuery, SampledPrivateQuery):
    """Apply a sampling `sample` on the query `query`"""

    def __init__(
        self,
        query: Optional[PrivateQuery],
        sample: Sample,
    ) -> None:
        super().__init__()
        self._type = PrivateQueryType.sampled_query
        self._sample = sample

        if query:
            if isinstance(query, ComposedPrivateQuery):
                for subq in query.all_subqueries():
                    subq.depth += 1
            else:
                query.depth += 1
            self._subqueries.append(query)

    @property
    def sample(self) -> Sample:
        return self._sample

    def rdp(
        self, alpha: Union[float, List[float]]
    ) -> Union[float, List[float]]:
        sub_q = list(self.subqueries)[0]
        if not isinstance(alpha, list):
            if alpha in self._rdp:
                return self._rdp[alpha]

            rdp_alpha = self.sample.rdp(alpha, sub_q)
            self._rdp[alpha] = rdp_alpha
            return rdp_alpha

        return [cast(float, self.rdp(a)) for a in alpha]

    def epsilon_delta(self) -> Tuple[float, float]:
        sub_q = list(self.subqueries)[0]
        return self.sample.epsilon_delta(sub_q)

    def pure_delta(self) -> float:
        sub_q = list(self.subqueries)[0]
        return self.sample.pure_delta(sub_q)

    def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
        if session:
            existing_query = cast(
                Query,
                session.query(SQLPrivateQuery).where(
                    SQLPrivateQuery.uuid == str(self.uuid)
                ),
            ).one_or_none()
            if existing_query:
                return cast(SQLPrivateQuery, existing_query)

        sql_query = SQLPrivateQuery(
            type=PrivateQueryType.sampled_query,
            uuid=str(self.uuid),
            status=self.status,
            subqueries=[q.sql(session) for q in self.subqueries],
            sample=self.sample.sql(session),
        )
        if session:
            session.add(sql_query)
        return sql_query

    def protobuf(self) -> ProtoPrivateQuery:
        return ProtoPrivateQuery(
            uuid=str(self.uuid),
            type=self.type.name,
            status=self.status.name,
            sample=self.sample.protobuf(),
            subqueries=[str(subq.uuid) for subq in self.subqueries],
        )


class GaussianQuery(BasePrivateQuery):
    """Gaussian query"""

    def __init__(self, noise: float):
        super().__init__()
        self._type = PrivateQueryType.gaussian_query
        self._parameters: dict = {"noise": noise}

    @property
    def parameters(self) -> dict:
        return self._parameters

    def rdp(
        self, alpha: Union[float, List[float]]
    ) -> Union[float, List[float]]:
        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        if not isinstance(alpha, list):
            if alpha in self._rdp:
                return self._rdp[alpha]
            rdp_alpha = alpha / (2 * (self.parameters["noise"] ** 2))
            self._rdp[alpha] = rdp_alpha
            return rdp_alpha  # type: ignore[no-any-return]
        return [cast(float, self.rdp(a)) for a in alpha]

    def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
        if session:
            existing_query = cast(
                Query,
                session.query(SQLPrivateQuery).where(
                    SQLPrivateQuery.uuid == str(self.uuid)
                ),
            ).one_or_none()
            if existing_query:
                return cast(SQLPrivateQuery, existing_query)

        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        sql_query = SQLPrivateQuery(
            type=PrivateQueryType.gaussian_query,
            uuid=str(self.uuid),
            status=self.status,
            parameters=self.parameters,
        )
        if session:
            session.add(sql_query)
        return sql_query

    def protobuf(self) -> ProtoPrivateQuery:
        return ProtoPrivateQuery(
            uuid=str(self.uuid),
            type=self.type.name,
            status=self.status.name,
            parameters=self.parameters,
        )


class LaplaceQuery(BasePrivateQuery):
    """Laplace query"""

    def __init__(self, noise: float):
        super().__init__()
        self._type = PrivateQueryType.laplace_query
        self._parameters: dict = {"noise": noise}

    @property
    def parameters(self) -> dict:
        return self._parameters

    def rdp(
        self, alpha: Union[float, List[float]]
    ) -> Union[float, List[float]]:
        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        n: float = self.parameters["noise"]
        if not isinstance(alpha, list):
            if alpha in self._rdp:
                return self._rdp[alpha]

            if alpha == 1:
                rdp_alpha = max(0, 1 / n + math.exp(-1 / n) - 1)
            else:
                try:
                    rdp_alpha = max(
                        0,
                        1.0 / n
                        + math.log(
                            (
                                alpha
                                + (alpha - 1) * math.exp(-(2 * alpha - 1) / n)
                            )
                            / (2 * alpha - 1)
                        )
                        / (alpha - 1),
                    )
                except OverflowError:  # if exp too high
                    rdp_alpha = float("inf")
            self._rdp[alpha] = rdp_alpha
            return rdp_alpha

        return [cast(float, self.rdp(a)) for a in alpha]

    def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
        if session:
            existing_query = cast(
                Query,
                session.query(SQLPrivateQuery).where(
                    SQLPrivateQuery.uuid == str(self.uuid)
                ),
            ).one_or_none()
            if existing_query:
                return cast(SQLPrivateQuery, existing_query)

        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        sql_query = SQLPrivateQuery(
            type=PrivateQueryType.laplace_query,
            uuid=str(self.uuid),
            status=self.status,
            parameters=self.parameters,
        )
        if session:
            session.add(sql_query)
        return sql_query

    def protobuf(self) -> ProtoPrivateQuery:
        return ProtoPrivateQuery(
            uuid=str(self.uuid),
            type=self.type.name,
            status=self.status.name,
            parameters=self.parameters,
        )


class RandomizedResponseQuery(BasePrivateQuery):
    """Randomized response query"""

    def __init__(self, probability: float):
        super().__init__()
        self._type = PrivateQueryType.randomized_response_query
        self._parameters: dict = {"probability": probability}

    @property
    def parameters(self) -> dict:
        return self._parameters

    def rdp(
        self, alpha: Union[float, List[float]]
    ) -> Union[float, List[float]]:
        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        p = self.parameters["probability"]
        if not isinstance(alpha, list):
            if alpha in self._rdp:
                return self._rdp[alpha]
            if alpha == 1:
                rdp_alpha = (2 * p - 1) * math.log(p / (1 - p))
            else:
                rdp_alpha = (
                    1
                    / (alpha - 1)
                    * math.log(
                        math.pow(p, alpha) * math.pow(1 - p, 1 - alpha)
                        + math.pow(1 - p, alpha) * math.pow(p, 1 - alpha)
                    )
                )
            self._rdp[alpha] = rdp_alpha
            return rdp_alpha  # type: ignore[no-any-return]
        return [cast(float, self.rdp(a)) for a in alpha]

    def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
        if session:
            existing_query = cast(
                Query,
                session.query(SQLPrivateQuery).where(
                    SQLPrivateQuery.uuid == str(self.uuid)
                ),
            ).one_or_none()
            if existing_query:
                return cast(SQLPrivateQuery, existing_query)

        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        sql_query = SQLPrivateQuery(
            type=PrivateQueryType.randomized_response_query,
            uuid=str(self.uuid),
            status=self.status,
            parameters=self.parameters,
        )
        if session:
            session.add(sql_query)
        return sql_query

    def protobuf(self) -> ProtoPrivateQuery:
        return ProtoPrivateQuery(
            uuid=str(self.uuid),
            type=self.type.name,
            status=self.status.name,
            parameters=self.parameters,
        )


class EpsilonQuery(BasePrivateQuery):
    """Pure-DP query"""

    def __init__(self, epsilon: float):
        super().__init__()
        self._type = PrivateQueryType.epsilon_query
        self._parameters: dict = {"epsilon": epsilon}

    @property
    def parameters(self) -> dict:
        return self._parameters

    def rdp(
        self, alpha: Union[float, List[float]]
    ) -> Union[float, List[float]]:
        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        eps: float = self.parameters["epsilon"]
        if not isinstance(alpha, list):
            if alpha in self._rdp:
                return self._rdp[alpha]
            rdp_alpha = max(
                0,
                eps
                + (
                    math.log(
                        1
                        - math.exp(-2 * alpha * eps)
                        - math.exp(-eps)
                        + math.exp(-(2 * alpha - 1) * eps)
                    )
                    - math.log(1 - math.exp(-2 * eps))
                )
                / (alpha - 1),
            )
            self._rdp[alpha] = rdp_alpha
            return rdp_alpha
        return [cast(float, self.rdp(a)) for a in alpha]

    def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
        if session:
            existing_query = cast(
                Query,
                session.query(SQLPrivateQuery).where(
                    SQLPrivateQuery.uuid == str(self.uuid)
                ),
            ).one_or_none()
            if existing_query:
                return cast(SQLPrivateQuery, existing_query)

        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        sql_query = SQLPrivateQuery(
            type=PrivateQueryType.epsilon_query,
            uuid=str(self.uuid),
            status=self.status,
            parameters=self.parameters,
        )
        if session:
            session.add(sql_query)
        return sql_query

    def protobuf(self) -> ProtoPrivateQuery:
        return ProtoPrivateQuery(
            uuid=str(self.uuid),
            type=self.type.name,
            status=self.status.name,
            parameters=self.parameters,
        )


class SampledGaussianMechanismQuery(BasePrivateQuery):
    """SGM query"""

    def __init__(
        self,
        sampling_probability: float,
        noise_multiplier: float,
        steps: int,
    ) -> None:
        super().__init__()
        self._type = PrivateQueryType.sgm_query
        self._parameters: dict = {
            "sampling_probability": sampling_probability,
            "noise_multiplier": noise_multiplier,
            "steps": steps,
        }

    @property
    def parameters(self) -> dict:
        return self._parameters

    # pylint: disable=protected-access
    def rdp(
        self, alpha: Union[float, List[float]]
    ) -> Union[float, List[float]]:
        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        if not isinstance(alpha, list):
            if alpha in self._rdp:
                return self._rdp[alpha]

            rdp_alpha = max(
                0,
                analysis._compute_rdp(
                    self.parameters["sampling_probability"],
                    self.parameters["noise_multiplier"],
                    alpha,
                )
                * self.parameters["steps"],
            )
            self._rdp[alpha] = rdp_alpha
            return rdp_alpha  # type: ignore[no-any-return]
        return [cast(float, self.rdp(a)) for a in alpha]

    def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
        if session:
            existing_query = cast(
                Query,
                session.query(SQLPrivateQuery).where(
                    SQLPrivateQuery.uuid == str(self.uuid)
                ),
            ).one_or_none()

            if existing_query:
                return cast(SQLPrivateQuery, existing_query)

        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        sql_query = SQLPrivateQuery(
            type=PrivateQueryType.sgm_query,
            uuid=str(self.uuid),
            status=self.status,
            parameters=self.parameters,
        )
        if session:
            session.add(sql_query)
        return sql_query

    def protobuf(self) -> ProtoPrivateQuery:
        return ProtoPrivateQuery(
            uuid=str(self.uuid),
            type=self.type.name,
            status=self.status.name,
            parameters=self.parameters,
        )


class EpsilonDeltaQuery(BasePrivateQuery):
    """Generic approximate-DP query"""

    def __init__(
        self,
        epsilon: float,
        delta: float,
    ):
        super().__init__()
        self._type = PrivateQueryType.epsilon_delta_query
        self._parameters: dict = {
            "epsilon": epsilon,
            "delta": delta,
        }

    @property
    def parameters(self) -> dict:
        return self._parameters

    def rdp(
        self, alpha: Union[float, List[float]]
    ) -> Union[float, List[float]]:
        pure_eps_query = EpsilonQuery(epsilon=self.parameters['epsilon'])
        return pure_eps_query.rdp(alpha)

    def epsilon_delta(self) -> Tuple[float, float]:
        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        return (self.parameters["epsilon"], self.parameters["delta"])

    def pure_delta(self) -> float:
        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        return cast(float, self.parameters["delta"])

    def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
        if session:
            existing_query = cast(
                Query,
                session.query(SQLPrivateQuery).where(
                    SQLPrivateQuery.uuid == str(self.uuid)
                ),
            ).one_or_none()
            if existing_query:
                return cast(SQLPrivateQuery, existing_query)

        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        sql_query = SQLPrivateQuery(
            type=PrivateQueryType.epsilon_delta_query,
            uuid=str(self.uuid),
            status=self.status,
            parameters=self.parameters,
        )
        if session:
            session.add(sql_query)
        return sql_query

    def protobuf(self) -> ProtoPrivateQuery:
        return ProtoPrivateQuery(
            uuid=str(self.uuid),
            type=self.type.name,
            status=self.status.name,
            parameters=self.parameters,
        )


class MultipleQuery(BaseComposedPrivateQuery):
    """Apply several times the same query"""

    def __init__(
        self,
        query: Optional[PrivateQuery],
        multiplicity: int,
    ):
        super().__init__()
        self._type = PrivateQueryType.multiple_query
        self._parameters: dict = {
            'multiplicity': multiplicity,
        }

        if query:
            if isinstance(query, ComposedPrivateQuery):
                for subq in query.all_subqueries():
                    subq.depth += 1
            else:
                query.depth += 1
            self._subqueries.append(query)

    @property
    def parameters(self) -> dict:
        return self._parameters

    def rdp(
        self, alpha: Union[float, List[float]]
    ) -> Union[float, List[float]]:
        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        sub_q = list(self.subqueries)[0]
        if not isinstance(alpha, list):
            if alpha in self._rdp:
                return self._rdp[alpha]
            rdp_alpha = cast(
                float, sub_q.rdp(alpha) * self.parameters['multiplicity']
            )
            self._rdp[alpha] = rdp_alpha
            return rdp_alpha
        return [cast(float, self.rdp(a)) for a in alpha]

    def epsilon_delta(self) -> Tuple[float, float]:
        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        eps, delta = list(self.subqueries)[0].epsilon_delta()
        return (
            eps * self.parameters['multiplicity'],
            delta * self.parameters['multiplicity'],
        )

    def pure_delta(self) -> float:
        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        delta = list(self.subqueries)[0].pure_delta()
        return delta * cast(float, self.parameters['multiplicity'])

    def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
        if session:
            existing_query = cast(
                Query,
                session.query(SQLPrivateQuery).where(
                    SQLPrivateQuery.uuid == str(self.uuid)
                ),
            ).one_or_none()
            if existing_query:
                return cast(SQLPrivateQuery, existing_query)

        if not self.parameters:
            raise ValueError(
                'self.parameters should not be set to None for this query type'
            )

        sql_query = SQLPrivateQuery(
            type=PrivateQueryType.multiple_query,
            uuid=str(self.uuid),
            status=self.status,
            subqueries=[q.sql(session) for q in self.subqueries],
            parameters=self.parameters,
        )
        if session:
            session.add(sql_query)
        return sql_query

    def protobuf(self) -> ProtoPrivateQuery:
        return ProtoPrivateQuery(
            uuid=str(self.uuid),
            type=self.type.name,
            status=self.status.name,
            parameters=self.parameters,
            subqueries=[str(subq.uuid) for subq in self.subqueries],
        )