Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
"""Privacy Accountant classes"""
# pylint: disable=redefined-builtin, too-many-lines

from __future__ import annotations

import datetime
import enum
import logging
from typing import (
    Collection,
    Iterable,
    List,
    Optional,
    Sequence,
    Set,
    Tuple,
    Union,
    cast,
)
from warnings import warn

import numpy as np
from sqlalchemy import (
    DateTime,
    Enum,
    Float,
    ForeignKey,
    Index,
    Integer,
    String,
    and_,
    desc,
)
from sqlalchemy.orm import Mapper, Query, mapped_column, relationship
from sqlalchemy.orm.session import Session
from sqlalchemy.sql import func
from sqlalchemy.sql.elements import BinaryExpression

from sarus_differential_privacy import analysis
from sarus_differential_privacy.accountant import (
    BasePrivacyAccountant,
    PrivacyProfile,
    PrivateQueryFilter,
)
from sarus_differential_privacy.query import (
    ComposedPrivateQuery,
    PrivateQuery,
    PrivateQueryStatus,
    SQLPrivateQuery,
)
from sarus_differential_privacy.typing import PrivacyLimit
from sarus_differential_privacy.utils import Base, Json, compare


class PrivacyProfileSQL(
    Base, PrivacyProfile
):  # pylint: disable=too-few-public-methods
    """Stores privacy states.
    Privacy profiles are deleted if a query contained in it is updated"""

    __tablename__ = "profile"
    id = mapped_column(
        Integer, primary_key=True
    )  # pylint: disable=invalid-name
    accountant_id = mapped_column(Integer, ForeignKey("accountant.id"))
    accountant = relationship("RDPPrivacyAccountant", backref="profiles")
    max_id = mapped_column(Integer)
    timestamp = mapped_column(DateTime)
    filter = mapped_column(Json(128))  # 128 is a placeholder
    rdp = mapped_column(Json(128))
    delta = mapped_column(Float)


# pylint: disable=invalid-name
class AccountantType(enum.Enum):  # pylint: disable=too-few-public-methods
    """Enum of accountant types"""

    rdp_accountant = 1
    rdp_odometer = 2


# pylint: disable=no-self-use, too-many-public-methods
class RDPPrivacyAccountant(Base, BasePrivacyAccountant):
    """The privacy accountant.
    For now it refers to a ledger and computes privacy based on RDP
    Note that epsilon_RDP is always nondecreasing in alpha
    If importing an older db and there alphas contains elements not used
    before, there are two strategies:
    - if compute_new_alphas == False (default), considered alphas will be the
        intersection of the previous and the new alphas (if None, raise error)
    - if compute_new_alphas == True, RDP for old queries will be computed and
    added to db
    """

    __tablename__ = "accountant"
    id = mapped_column(Integer, primary_key=True)
    type = mapped_column(Enum(AccountantType))
    alphas = mapped_column(Json(1024))

    __mapper_args__ = {
        "polymorphic_on": type,
        "polymorphic_identity": AccountantType.rdp_accountant,
    }

    def __init__(
        self,
        session: Optional[Session],
        alphas: Optional[Sequence[float]] = None,
        compute_new_alphas: bool = False,
        warning_no_session: bool = True,
    ):
        super().__init__()
        self._session = session
        if alphas is None:
            alphas = tuple(
                [1 + x / 10.0 for x in range(1, 100)] + list(range(12, 64))
            )
        if session is None:
            if warning_no_session:
                logging.warning(
                    "Session is None: only epsilon_query, \
                    delta_query and gauge_query are available"
                )
            self.alphas = tuple(alphas)
        else:
            # check if there are queries already registered in DB with alphas
            # not in self.alphas
            previous_alphas = {
                a[0]
                for a in self.session()  # type: ignore[union-attr]
                .query(RDP.alpha)
                .all()
            }
            if previous_alphas == set() or set(alphas) <= previous_alphas:
                self.alphas = tuple(alphas)
            else:
                logging.warning(
                    "alphas contains elements that weren't used before"
                )
                intersection = set(alphas) & previous_alphas
                difference = set(alphas) ^ previous_alphas
                if compute_new_alphas:
                    self.alphas = tuple(alphas)
                    logging.warning(
                        "compute_new_alphas == True: computing \
                        rdps for new alphas"
                    )
                    rdp_instances = []
                    for alpha in difference:
                        for q in self.queries():
                            rdp = q.rdp(alpha)
                            if rdp:
                                rdp_instances.append(
                                    {
                                        'query': str(q.uuid),
                                        'alpha': alpha,
                                        'rdp': rdp,
                                    }
                                )
                    session.bulk_insert_mappings(
                        cast(Mapper, RDP), rdp_instances
                    )
                else:
                    logging.warning(
                        "compute_new_alphas == False: using intersection of \
                        sets: %s",
                        {intersection},
                    )
                    if len(intersection) == 0:
                        raise ValueError("No suitable alphas have been found")
                    self.alphas = tuple(intersection)

            self.session().add(self)  # type: ignore[union-attr]
            self.session().commit()  # type: ignore[union-attr]

    def session(self) -> Optional[Session]:
        """Returns the SQLalchemy session"""
        return self._session

    # pylint: disable=bad-builtin, cell-var-from-loop
    def queries(
        self, _filter: Optional[PrivateQueryFilter] = None
    ) -> List[PrivateQuery]:
        if not self.session():
            raise ValueError('Cannot use this function without a Session !')

        if _filter is None:
            queries = (
                self.session()  # type: ignore[union-attr]
                .query(SQLPrivateQuery)
                .all()
            )
            return [q.private_query() for q in queries]

        queries = self.session().query(  # type: ignore[union-attr]
            SQLPrivateQuery
        )

        filters = self._filter_sql_queries(_filter)
        for filt in filters:
            queries = queries.filter(filt)
        return [q.private_query() for q in queries.all()]

    def _filter_sql_queries(
        self, _filter: PrivateQueryFilter
    ) -> List[BinaryExpression]:
        filters = []
        for raw in _filter.filter_condition:
            try:
                key, op, value = raw
            except ValueError:
                raise Exception("Invalid filter: %s" % raw)
            column = getattr(SQLPrivateQuery, key, None)
            if not column:
                raise Exception("Invalid filter column: %s" % key)
            if op == "in":
                if isinstance(value, list):
                    filt = column.in_(value)
                else:
                    filt = column.in_(value.split(","))
            elif op == "not in":
                if isinstance(value, list):
                    filt = column.not_in(value)
                else:
                    filt = column.not_in(value.split(","))
            else:
                try:
                    attr = (
                        list(
                            filter(
                                lambda e: hasattr(column, e % op),
                                ["%s", "%s_", "__%s__"],
                            )
                        )[0]
                        % op
                    )
                except IndexError as index_error:
                    raise Exception(
                        f"Invalid filter operator: {op}"
                    ) from index_error
                if value == "null":
                    value = None
                filt = getattr(column, attr)(value)
            filters.append(filt)
        return filters

    def profile(
        self, filter: Optional[PrivateQueryFilter] = None
    ) -> Optional[PrivacyProfileSQL]:
        """Returns the last PrivacyProfile corresponding to the filter or None
        Should the alphas be an optional argument ?
        """
        if not self.session():
            raise ValueError('Cannot use this function without a Session !')

        f = self._profile_filter(filter)
        profiles = (
            self.session()  # type: ignore[union-attr]
            .query(PrivacyProfileSQL)
            .join(PrivacyProfileSQL.accountant)
            .group_by(PrivacyProfileSQL)
            .order_by(desc(PrivacyProfileSQL.timestamp))
            .all()
        )
        if not profiles:
            return None
        profiles = [p for p in profiles if compare(f, p.filter)]
        if profiles == []:
            return None
        return profiles[0]  # type: ignore[no-any-return]

    def save_to_profile(
        self, filter: Optional[PrivateQueryFilter] = None
    ) -> PrivacyProfileSQL:
        """Save current (filtered) ledger to a PrivacyProfileSQL object"""
        if not self.session():
            raise ValueError('Cannot use this function without a Session !')

        f = self._profile_filter(filter)
        delta = self.pure_delta(filter)
        profile = PrivacyProfileSQL(
            accountant=self,
            filter=f,
            rdp=self.rdp(filter),
            timestamp=datetime.datetime.now(),
            max_id=self.session()  # type: ignore[union-attr]
            .query(SQLPrivateQuery)
            .count(),
            delta=delta,
        )
        self.session().add(profile)  # type: ignore[union-attr]
        return profile

    def _register(
        self, query: Union[PrivateQuery, Collection[PrivateQuery]]
    ) -> None:
        """Register single non-composed query"""
        if not self.session():
            raise ValueError('Cannot use this function without a Session !')

        if isinstance(query, PrivateQuery):
            queries: Collection[PrivateQuery] = {query}
        else:
            queries = query
        self._update(status=PrivateQueryStatus.executed, queries=queries)
        self.session().add_all(  # type: ignore[union-attr]
            [q.sql(self.session()) for q in queries]
        )

    def _register_rdp(
        self, query: Union[PrivateQuery, Collection[PrivateQuery]]
    ) -> None:
        """Register rdp profile of query in the RDP table"""
        session = self.session()
        if not session:
            raise ValueError('Cannot use this function without a Session !')

        if isinstance(query, PrivateQuery):
            queries: Collection[PrivateQuery] = {query}
        else:
            queries = query

        descendants: Set[PrivateQuery] = set()
        for q in queries:
            descendants.update(
                q.all_subqueries()
                if isinstance(q, ComposedPrivateQuery)
                else {q}
            )
        descendants = {
            q
            for q in descendants
            if not cast(
                Query, session.query(RDP).where(RDP.query == str(q.uuid))
            ).first()
        }
        rdp_instances = []
        for alpha in self.alphas:
            for q in descendants:
                rdp = q.rdp(alpha)
                if rdp:
                    rdp_instances.append(
                        {
                            'query': str(q.uuid),
                            'alpha': alpha,
                            'rdp': rdp,
                        }
                    )
        session.bulk_insert_mappings(cast(Mapper, RDP), rdp_instances)

    def _register_pure_delta(
        self, query: Union[PrivateQuery, Collection[PrivateQuery]]
    ) -> None:
        """Register non-rdp profile of query in  the EpsilonDelta table.
        If query is RDP, it is not added"""
        session = self.session()
        if not session:
            raise ValueError('Cannot use this function without a Session !')

        if isinstance(query, PrivateQuery):
            queries: Collection[PrivateQuery] = {query}
        else:
            queries = query

        deltas = []
        descendants: Set[PrivateQuery] = set()
        for q in queries:
            descendants.update(
                q.all_subqueries()
                if isinstance(q, ComposedPrivateQuery)
                else {q}
            )
        descendants = {
            q
            for q in descendants
            if not cast(
                Query,
                session.query(EpsilonDelta).where(
                    EpsilonDelta.query == str(q.uuid)
                ),
            ).first()
        }
        for q in descendants:
            # pylint: disable=fixme
            epsilon, delta = q.epsilon_delta()  # TODO replace by pure_delta()
            if delta != 0:
                deltas.append(
                    EpsilonDelta(
                        query=str(q.uuid),
                        epsilon=epsilon,
                        delta=delta,
                    )
                )
        session.add_all(deltas)

    def register(
        self, query: Union[PrivateQuery, Collection[PrivateQuery]]
    ) -> RDPPrivacyAccountant:
        if not self.session():
            raise ValueError('Cannot use this function without a Session !')

        self._register(query)
        self._register_rdp(query)
        self._register_pure_delta(query)
        self.session().commit()  # type: ignore[union-attr]
        return self

    def verify_register(
        self,
        limit: Optional[PrivacyLimit],
        query: Union[PrivateQuery, Collection[PrivateQuery]],
        filter: Optional[PrivateQueryFilter] = None,
    ) -> bool:
        """Atomically verify if a query fits in the limit and record it in \
            a queries"""
        if not self.session():
            raise ValueError('Cannot use this function without a Session !')

        self.session().begin_nested()  # type: ignore[union-attr]
        self._register(query)

        if self.verify_new_query(query, limit, filter):
            self._register_rdp(query)
            self._register_pure_delta(query)
            self.session().commit()  # type: ignore[union-attr]
            return True

        self.session().rollback()  # type: ignore[union-attr]
        return False

    def verify_register_multiple(
        self,
        constraints: List[
            Tuple[Optional[PrivacyLimit], Optional[PrivateQueryFilter]]
        ],
        query: Union[PrivateQuery, Collection[PrivateQuery]],
    ) -> bool:
        if not self.session():
            raise ValueError('Cannot use this function without a Session !')

        self.session().begin_nested()  # type: ignore[union-attr]
        self._register(query)

        if all(
            [
                self.verify_new_query(query, limit, filter)
                for (limit, filter) in constraints
            ]
        ):
            self._register_rdp(query)
            self._register_pure_delta(query)
            self.session().commit()  # type: ignore[union-attr]
            return True

        self.session().rollback()  # type: ignore[union-attr]
        return False

    def _update(
        self,
        status: PrivateQueryStatus,
        queries: Collection[PrivateQuery],
    ) -> RDPPrivacyAccountant:
        """Update the status of the queries in `queries` and all their
        subqueries.
        This one does NOT commit to the DB; use update() if you want to."""
        session = self.session()
        if not session:
            raise ValueError('Cannot use this function without a Session !')

        queries_and_subqueries: List[PrivateQuery] = list()
        for q in queries:
            if isinstance(q, ComposedPrivateQuery):
                queries_and_subqueries += q.all_subqueries()
            else:
                queries_and_subqueries.append(q)

        uuid_set = {str(q.uuid) for q in queries_and_subqueries}
        for query in queries_and_subqueries:
            query.status = status

        nb_updated_rows = cast(
            Query,
            session.query(SQLPrivateQuery).filter(
                SQLPrivateQuery.uuid.in_(uuid_set)
            ),
        ).update({'status': status}, synchronize_session='fetch')

        # remove PrivacyProfileSQL if it contains an updated query
        if nb_updated_rows:
            subquery = session.query(func.min(SQLPrivateQuery.id)).filter(
                SQLPrivateQuery.uuid.in_(uuid_set)
            )
            cast(
                Query,
                session.query(PrivacyProfileSQL).filter(
                    PrivacyProfileSQL.max_id >= subquery
                ),
            ).delete(synchronize_session='fetch')
        return self

    def update(
        self,
        status: PrivateQueryStatus,
        queries: Collection[PrivateQuery],
    ) -> RDPPrivacyAccountant:
        """Update the status of the queries in `queries` and all their
        subqueries."""
        session = self.session()
        if not session:
            raise ValueError('Cannot use this function without a Session !')

        self._update(status, queries)
        session.commit()
        return self

    def _epsilon_from_rdp(self, rdp: List[float], delta: float) -> float:
        if all(v == 0 for v in rdp):
            return 0

        return cast(
            float,
            analysis.get_privacy_spent(self.alphas, rdp, target_delta=delta)[
                0
            ],
        )

    def _delta_from_rdp(self, rdp: List[float], epsilon: float) -> float:
        if all(v == 0 for v in rdp):
            return 0

        return cast(
            float,
            analysis.get_privacy_spent(self.alphas, rdp, target_eps=epsilon)[
                1
            ],
        )

    def epsilon(
        self, delta: float, filter: Optional[PrivateQueryFilter] = None
    ) -> float:
        not_rdp_delta = self.pure_delta(filter)
        if not_rdp_delta >= delta:
            raise ValueError(
                "Delta from EpsilonDeltaQueries is larger than \
                             total target delta"
            )
        return self._epsilon_from_rdp(self.rdp(filter), delta - not_rdp_delta)

    def delta(
        self, epsilon: float, filter: Optional[PrivateQueryFilter] = None
    ) -> float:
        not_rdp_delta = self.pure_delta(filter)
        rdp_delta = self._delta_from_rdp(self.rdp(filter), epsilon)
        return rdp_delta + not_rdp_delta

    def rdp(self, filter: Optional[PrivateQueryFilter] = None) -> List[float]:
        """computes rdp of the current (filtered) queries.
        Only top-level queries are kept so as to not count subqueries twice"""
        new_filter = self._keep_top_queries(filter)

        # if profile, load it and filter queries
        profile = self.profile(filter)
        if profile:
            rdps = profile.rdp
            new_condition = new_filter.filter_condition.copy()
            new_condition.append(("id", "gt", profile.max_id))
            new_filter = PrivateQueryFilter(new_condition)
        else:
            rdps = [0 for alpha in self.alphas]

        rdps_from_table = self._rdp_from_table(new_filter, self.alphas)
        return [
            rdp + rdp_from_table
            for rdp_from_table, rdp in zip(rdps_from_table, rdps)
        ]

    def _rdp_from_table(
        self, filter: PrivateQueryFilter, alphas: Sequence[float]
    ) -> List[float]:
        """Returns the rdp contained in the rdps table (see RDP class)"""
        session = self.session()
        if not session:
            raise ValueError('Cannot use this function without a Session !')

        filters = self._filter_sql_queries(filter)

        rdp_alpha = (
            session.query(func.sum(RDP.rdp), RDP.alpha)
            .join(SQLPrivateQuery)
            .filter(and_(*filters))
            .filter(RDP.alpha.in_(alphas))
            .group_by(RDP.alpha)
            .all()
        )

        if len(rdp_alpha) == 0:
            rdp = [0.0 for _ in alphas]
        else:
            rdp = [
                rdp
                for rdp, alpha in sorted(
                    cast(Iterable[Tuple[float, float]], rdp_alpha),
                    key=lambda x: x[1],
                )
            ]
        return rdp

    def epsilon_delta(
        self, filter: Optional[PrivateQueryFilter] = None
    ) -> Tuple[float, float]:
        """Take into account (epsilon, delta)-DP mechanisms that cannot be
        translated to RDP.
        This returns the total (epsilon, delta) pair of those mechanisms"""
        warn(
            'This is deprecated. Please use pure_delta() for non-rdp privacy '
            'consumption.',
            DeprecationWarning,
            stacklevel=2,
        )

        new_filter = self._keep_top_queries(filter)

        # if profile, load it and filter queries
        profile = self.profile(filter)
        if profile:
            delta_profile = profile.delta
            new_condition = new_filter.filter_condition.copy()
            new_condition.append(("id", "gt", profile.max_id))
            new_filter = PrivateQueryFilter(new_condition)
        else:
            delta_profile = 0.0

        # we use the simple composition for now
        epsilon_table, delta_table = self._epsilon_delta_from_table(new_filter)
        total_epsilon = epsilon_table
        total_delta = delta_profile + delta_table
        return total_epsilon, total_delta

    def _epsilon_delta_from_table(
        self, filter: PrivateQueryFilter
    ) -> Tuple[float, float]:
        """Returns the (epsilon, delta) contained in the EpsilonDelta table"""
        if not self.session():
            raise ValueError('Cannot use this function without a Session !')

        filters = self._filter_sql_queries(filter)

        epsilon = (
            self.session()  # type: ignore[union-attr]
            .query(func.sum(EpsilonDelta.epsilon))
            .join(SQLPrivateQuery)
            .filter(and_(*filters))
            .scalar()
        )

        delta = (
            self.session()  # type: ignore[union-attr]
            .query(func.sum(EpsilonDelta.delta))
            .join(SQLPrivateQuery)
            .filter(and_(*filters))
            .scalar()
        )

        if epsilon is None:
            return 0, 0
        return epsilon, delta

    def pure_delta(self, filter: Optional[PrivateQueryFilter] = None) -> float:
        """Take into account (epsilon, delta)-DP mechanisms that cannot be
        translated to RDP.
        This returns the total delta of those mechanisms"""
        new_filter = self._keep_top_queries(filter)

        # if profile, load it and filter queries
        profile = self.profile(filter)
        delta_profile: float
        if profile:
            delta_profile = profile.delta
            new_condition = new_filter.filter_condition.copy()
            new_condition.append(("id", "gt", profile.max_id))
            new_filter = PrivateQueryFilter(new_condition)
        else:
            delta_profile = 0.0

        # we use the simple composition for now
        delta_table: float = self._pure_delta_from_table(new_filter)
        return delta_profile + delta_table

    def _pure_delta_from_table(self, filter: PrivateQueryFilter) -> float:
        """Returns the delta contained in the EpsilonDelta table"""
        if not self.session():
            raise ValueError('Cannot use this function without a Session !')

        filters = self._filter_sql_queries(filter)

        delta = (
            self.session()  # type: ignore[union-attr]
            .query(func.sum(EpsilonDelta.delta))
            .join(SQLPrivateQuery)
            .filter(and_(*filters))
            .scalar()
        )

        if delta is None:
            return 0
        return cast(float, delta)

    def _keep_top_queries(
        self, filter: Optional[PrivateQueryFilter]
    ) -> PrivateQueryFilter:
        """Remove subqueries for rdp and epsilon-delta computations"""
        if filter is None:
            new_filter = PrivateQueryFilter([("supquery", "eq", "null")])
        elif "supquery" not in [row[0] for row in filter.filter_condition]:
            new_condition = filter.filter_condition.copy()
            new_condition.append(("supquery", "eq", "null"))
            new_filter = PrivateQueryFilter(new_condition)
        else:
            new_filter = filter

        return new_filter

    def epsilon_new_query(
        self,
        delta: float,
        query: Union[PrivateQuery, Collection[PrivateQuery]],
        filter: Optional[PrivateQueryFilter] = None,
    ) -> float:
        """Epsilon of accountant + query 'query'"""
        not_rdp_delta = self.pure_delta_new_query(query, filter)
        if not_rdp_delta >= delta:
            return cast(float, np.inf)
        return self._epsilon_from_rdp(
            self.rdp_new_query(query, filter), delta - not_rdp_delta
        )

    def delta_new_query(
        self,
        epsilon: float,
        query: Union[PrivateQuery, Collection[PrivateQuery]],
        filter: Optional[PrivateQueryFilter] = None,
    ) -> float:
        """Delta of accountant + query 'query'"""
        not_rdp_delta = self.pure_delta_new_query(query, filter)
        rdp_delta = self._delta_from_rdp(
            self.rdp_new_query(query, filter), epsilon
        )
        return rdp_delta + not_rdp_delta

    def rdp_new_query(
        self,
        query: Union[PrivateQuery, Collection[PrivateQuery]],
        filter: Optional[PrivateQueryFilter] = None,
    ) -> List[float]:
        """RDP of accountant + query 'query'"""
        if isinstance(query, PrivateQuery):
            queries: Collection[PrivateQuery] = {query}
        else:
            queries = {q for q in query if q.depth == 0}
        new_filter = self._add_uuid_to_filter(filter, queries)
        return [
            rdp + sum([cast(float, q.rdp(alpha)) for q in queries])
            for alpha, rdp in zip(self.alphas, self.rdp(new_filter))
        ]

    def epsilon_delta_new_query(
        self,
        query: Union[PrivateQuery, Collection[PrivateQuery]],
        filter: Optional[PrivateQueryFilter] = None,
    ) -> Tuple[float, float]:
        """(Epsilon, delta) of accountant + query 'query'"""
        warn(
            'This is deprecated. Please use pure_delta_new_query() for '
            'non-rdp privacy consumption.',
            DeprecationWarning,
            stacklevel=2,
        )

        if isinstance(query, PrivateQuery):
            queries: Collection[PrivateQuery] = {query}
        else:
            queries = {q for q in query if q.depth == 0}
        new_filter = self._add_uuid_to_filter(filter, queries)
        eps, delta = self.epsilon_delta(new_filter)
        new_eps_delta = [q.epsilon_delta() for q in queries]
        new_eps = sum([new_ed[0] for new_ed in new_eps_delta])
        new_delta = sum([new_ed[1] for new_ed in new_eps_delta])
        return eps + new_eps, delta + new_delta

    def pure_delta_new_query(
        self,
        query: Union[PrivateQuery, Collection[PrivateQuery]],
        filter: Optional[PrivateQueryFilter] = None,
    ) -> float:
        """Delta of accountant + query 'query'"""

        if isinstance(query, PrivateQuery):
            queries: Collection[PrivateQuery] = {query}
        else:
            queries = {q for q in query if q.depth == 0}
        new_filter = self._add_uuid_to_filter(filter, queries)
        delta = self.pure_delta(new_filter)
        new_delta = sum([q.pure_delta() for q in queries])
        return delta + new_delta

    def _add_uuid_to_filter(
        self,
        filter: Optional[PrivateQueryFilter],
        queries: Collection[PrivateQuery],
    ) -> PrivateQueryFilter:
        # retrieve all queries & subqueries
        all_queries: Set[PrivateQuery] = set()
        for query in queries:
            if isinstance(query, ComposedPrivateQuery):
                all_queries.update(query.all_subqueries())
            else:
                all_queries.add(query)

        if filter is None:
            new_filter = PrivateQueryFilter(
                [("uuid", "not in", [str(q.uuid) for q in all_queries])]
            )
        else:
            new_condition = filter.filter_condition.copy()
            new_condition.append(
                ("uuid", "not in", [str(q.uuid) for q in all_queries])
            )
            new_filter = PrivateQueryFilter(new_condition)

        return new_filter

    def epsilon_query(
        self,
        delta: float,
        query: Union[PrivateQuery, Collection[PrivateQuery]],
    ) -> float:
        """Epsilon of single query 'query'"""
        not_rdp_delta = self.pure_delta_query(query)
        if not_rdp_delta >= delta:
            return cast(float, np.inf)
        return self._epsilon_from_rdp(
            self.rdp_query(query), delta - not_rdp_delta
        )

    def delta_query(
        self,
        epsilon: float,
        query: Union[PrivateQuery, Collection[PrivateQuery]],
    ) -> float:
        """Delta of single query 'query'"""
        not_rdp_delta = self.pure_delta_query(query)
        rdp_delta = self._delta_from_rdp(self.rdp_query(query), epsilon)
        return rdp_delta + not_rdp_delta

    def rdp_query(
        self, query: Union[PrivateQuery, Collection[PrivateQuery]]
    ) -> List[float]:
        """RDP of single query 'query'"""
        if isinstance(query, PrivateQuery):
            queries: Collection[PrivateQuery] = {query}
        else:
            queries = {q for q in query if q.depth == 0}
        return [
            sum([q.rdp(alpha) for q in queries])  # type: ignore[misc]
            for alpha in self.alphas
        ]

    def epsilon_delta_query(
        self, query: Union[PrivateQuery, Collection[PrivateQuery]]
    ) -> Tuple[float, float]:
        """(Epsilon, delta) of query 'query'"""
        warn(
            'This is deprecated. Please use pure_delta_query() for non-rdp '
            'privacy consumption.',
            DeprecationWarning,
            stacklevel=2,
        )

        if isinstance(query, PrivateQuery):
            queries: Collection[PrivateQuery] = {query}
        else:
            queries = {q for q in query if q.depth == 0}
        eps_delta = [q.epsilon_delta() for q in queries]
        eps = sum([new_ed[0] for new_ed in eps_delta])
        delta = sum([new_ed[1] for new_ed in eps_delta])
        return eps, delta

    def pure_delta_query(
        self, query: Union[PrivateQuery, Collection[PrivateQuery]]
    ) -> float:
        """Delta of query 'query'"""
        if isinstance(query, PrivateQuery):
            queries: Collection[PrivateQuery] = {query}
        else:
            queries = {q for q in query if q.depth == 0}
        return sum([q.pure_delta() for q in queries])


class RDP(Base):  # pylint: disable=too-few-public-methods
    # see https://github.com/PyCQA/pylint/issues/3525
    """Stores rdp as (id, query, alpha, rdp)
    This way, the rdp is not recomputed each time"""

    __tablename__ = "rdp"
    id = mapped_column(Integer, primary_key=True)
    query = mapped_column(String, ForeignKey("query.uuid", onupdate="cascade"))
    alpha = mapped_column(Float)
    rdp = mapped_column(Float)
    __table_args__ = (Index("idx_query_alpha", "query", "alpha"),)


class EpsilonDelta(Base):  # pylint: disable=too-few-public-methods
    """ "Stores non-rdp (epsilon, delta) queries"""

    __tablename__ = "epsilon_delta"
    id = mapped_column(Integer, primary_key=True)
    query = mapped_column(String, ForeignKey("query.uuid", onupdate="cascade"))
    epsilon = mapped_column(Float)
    delta = mapped_column(Float)