Repository URL to install this package:
|
Version:
1.0.1 ▾
|
"""Privacy Accountant classes"""
# pylint: disable=redefined-builtin, too-many-lines
from __future__ import annotations
import datetime
import enum
import logging
from typing import (
Collection,
Iterable,
List,
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
from warnings import warn
import numpy as np
from sqlalchemy import (
DateTime,
Enum,
Float,
ForeignKey,
Index,
Integer,
String,
and_,
desc,
)
from sqlalchemy.orm import Mapper, Query, mapped_column, relationship
from sqlalchemy.orm.session import Session
from sqlalchemy.sql import func
from sqlalchemy.sql.elements import BinaryExpression
from sarus_differential_privacy import analysis
from sarus_differential_privacy.accountant import (
BasePrivacyAccountant,
PrivacyProfile,
PrivateQueryFilter,
)
from sarus_differential_privacy.query import (
ComposedPrivateQuery,
PrivateQuery,
PrivateQueryStatus,
SQLPrivateQuery,
)
from sarus_differential_privacy.typing import PrivacyLimit
from sarus_differential_privacy.utils import Base, Json, compare
class PrivacyProfileSQL(
Base, PrivacyProfile
): # pylint: disable=too-few-public-methods
"""Stores privacy states.
Privacy profiles are deleted if a query contained in it is updated"""
__tablename__ = "profile"
id = mapped_column(
Integer, primary_key=True
) # pylint: disable=invalid-name
accountant_id = mapped_column(Integer, ForeignKey("accountant.id"))
accountant = relationship("RDPPrivacyAccountant", backref="profiles")
max_id = mapped_column(Integer)
timestamp = mapped_column(DateTime)
filter = mapped_column(Json(128)) # 128 is a placeholder
rdp = mapped_column(Json(128))
delta = mapped_column(Float)
# pylint: disable=invalid-name
class AccountantType(enum.Enum): # pylint: disable=too-few-public-methods
"""Enum of accountant types"""
rdp_accountant = 1
rdp_odometer = 2
# pylint: disable=no-self-use, too-many-public-methods
class RDPPrivacyAccountant(Base, BasePrivacyAccountant):
"""The privacy accountant.
For now it refers to a ledger and computes privacy based on RDP
Note that epsilon_RDP is always nondecreasing in alpha
If importing an older db and there alphas contains elements not used
before, there are two strategies:
- if compute_new_alphas == False (default), considered alphas will be the
intersection of the previous and the new alphas (if None, raise error)
- if compute_new_alphas == True, RDP for old queries will be computed and
added to db
"""
__tablename__ = "accountant"
id = mapped_column(Integer, primary_key=True)
type = mapped_column(Enum(AccountantType))
alphas = mapped_column(Json(1024))
__mapper_args__ = {
"polymorphic_on": type,
"polymorphic_identity": AccountantType.rdp_accountant,
}
def __init__(
self,
session: Optional[Session],
alphas: Optional[Sequence[float]] = None,
compute_new_alphas: bool = False,
warning_no_session: bool = True,
):
super().__init__()
self._session = session
if alphas is None:
alphas = tuple(
[1 + x / 10.0 for x in range(1, 100)] + list(range(12, 64))
)
if session is None:
if warning_no_session:
logging.warning(
"Session is None: only epsilon_query, \
delta_query and gauge_query are available"
)
self.alphas = tuple(alphas)
else:
# check if there are queries already registered in DB with alphas
# not in self.alphas
previous_alphas = {
a[0]
for a in self.session() # type: ignore[union-attr]
.query(RDP.alpha)
.all()
}
if previous_alphas == set() or set(alphas) <= previous_alphas:
self.alphas = tuple(alphas)
else:
logging.warning(
"alphas contains elements that weren't used before"
)
intersection = set(alphas) & previous_alphas
difference = set(alphas) ^ previous_alphas
if compute_new_alphas:
self.alphas = tuple(alphas)
logging.warning(
"compute_new_alphas == True: computing \
rdps for new alphas"
)
rdp_instances = []
for alpha in difference:
for q in self.queries():
rdp = q.rdp(alpha)
if rdp:
rdp_instances.append(
{
'query': str(q.uuid),
'alpha': alpha,
'rdp': rdp,
}
)
session.bulk_insert_mappings(
cast(Mapper, RDP), rdp_instances
)
else:
logging.warning(
"compute_new_alphas == False: using intersection of \
sets: %s",
{intersection},
)
if len(intersection) == 0:
raise ValueError("No suitable alphas have been found")
self.alphas = tuple(intersection)
self.session().add(self) # type: ignore[union-attr]
self.session().commit() # type: ignore[union-attr]
def session(self) -> Optional[Session]:
"""Returns the SQLalchemy session"""
return self._session
# pylint: disable=bad-builtin, cell-var-from-loop
def queries(
self, _filter: Optional[PrivateQueryFilter] = None
) -> List[PrivateQuery]:
if not self.session():
raise ValueError('Cannot use this function without a Session !')
if _filter is None:
queries = (
self.session() # type: ignore[union-attr]
.query(SQLPrivateQuery)
.all()
)
return [q.private_query() for q in queries]
queries = self.session().query( # type: ignore[union-attr]
SQLPrivateQuery
)
filters = self._filter_sql_queries(_filter)
for filt in filters:
queries = queries.filter(filt)
return [q.private_query() for q in queries.all()]
def _filter_sql_queries(
self, _filter: PrivateQueryFilter
) -> List[BinaryExpression]:
filters = []
for raw in _filter.filter_condition:
try:
key, op, value = raw
except ValueError:
raise Exception("Invalid filter: %s" % raw)
column = getattr(SQLPrivateQuery, key, None)
if not column:
raise Exception("Invalid filter column: %s" % key)
if op == "in":
if isinstance(value, list):
filt = column.in_(value)
else:
filt = column.in_(value.split(","))
elif op == "not in":
if isinstance(value, list):
filt = column.not_in(value)
else:
filt = column.not_in(value.split(","))
else:
try:
attr = (
list(
filter(
lambda e: hasattr(column, e % op),
["%s", "%s_", "__%s__"],
)
)[0]
% op
)
except IndexError as index_error:
raise Exception(
f"Invalid filter operator: {op}"
) from index_error
if value == "null":
value = None
filt = getattr(column, attr)(value)
filters.append(filt)
return filters
def profile(
self, filter: Optional[PrivateQueryFilter] = None
) -> Optional[PrivacyProfileSQL]:
"""Returns the last PrivacyProfile corresponding to the filter or None
Should the alphas be an optional argument ?
"""
if not self.session():
raise ValueError('Cannot use this function without a Session !')
f = self._profile_filter(filter)
profiles = (
self.session() # type: ignore[union-attr]
.query(PrivacyProfileSQL)
.join(PrivacyProfileSQL.accountant)
.group_by(PrivacyProfileSQL)
.order_by(desc(PrivacyProfileSQL.timestamp))
.all()
)
if not profiles:
return None
profiles = [p for p in profiles if compare(f, p.filter)]
if profiles == []:
return None
return profiles[0] # type: ignore[no-any-return]
def save_to_profile(
self, filter: Optional[PrivateQueryFilter] = None
) -> PrivacyProfileSQL:
"""Save current (filtered) ledger to a PrivacyProfileSQL object"""
if not self.session():
raise ValueError('Cannot use this function without a Session !')
f = self._profile_filter(filter)
delta = self.pure_delta(filter)
profile = PrivacyProfileSQL(
accountant=self,
filter=f,
rdp=self.rdp(filter),
timestamp=datetime.datetime.now(),
max_id=self.session() # type: ignore[union-attr]
.query(SQLPrivateQuery)
.count(),
delta=delta,
)
self.session().add(profile) # type: ignore[union-attr]
return profile
def _register(
self, query: Union[PrivateQuery, Collection[PrivateQuery]]
) -> None:
"""Register single non-composed query"""
if not self.session():
raise ValueError('Cannot use this function without a Session !')
if isinstance(query, PrivateQuery):
queries: Collection[PrivateQuery] = {query}
else:
queries = query
self._update(status=PrivateQueryStatus.executed, queries=queries)
self.session().add_all( # type: ignore[union-attr]
[q.sql(self.session()) for q in queries]
)
def _register_rdp(
self, query: Union[PrivateQuery, Collection[PrivateQuery]]
) -> None:
"""Register rdp profile of query in the RDP table"""
session = self.session()
if not session:
raise ValueError('Cannot use this function without a Session !')
if isinstance(query, PrivateQuery):
queries: Collection[PrivateQuery] = {query}
else:
queries = query
descendants: Set[PrivateQuery] = set()
for q in queries:
descendants.update(
q.all_subqueries()
if isinstance(q, ComposedPrivateQuery)
else {q}
)
descendants = {
q
for q in descendants
if not cast(
Query, session.query(RDP).where(RDP.query == str(q.uuid))
).first()
}
rdp_instances = []
for alpha in self.alphas:
for q in descendants:
rdp = q.rdp(alpha)
if rdp:
rdp_instances.append(
{
'query': str(q.uuid),
'alpha': alpha,
'rdp': rdp,
}
)
session.bulk_insert_mappings(cast(Mapper, RDP), rdp_instances)
def _register_pure_delta(
self, query: Union[PrivateQuery, Collection[PrivateQuery]]
) -> None:
"""Register non-rdp profile of query in the EpsilonDelta table.
If query is RDP, it is not added"""
session = self.session()
if not session:
raise ValueError('Cannot use this function without a Session !')
if isinstance(query, PrivateQuery):
queries: Collection[PrivateQuery] = {query}
else:
queries = query
deltas = []
descendants: Set[PrivateQuery] = set()
for q in queries:
descendants.update(
q.all_subqueries()
if isinstance(q, ComposedPrivateQuery)
else {q}
)
descendants = {
q
for q in descendants
if not cast(
Query,
session.query(EpsilonDelta).where(
EpsilonDelta.query == str(q.uuid)
),
).first()
}
for q in descendants:
# pylint: disable=fixme
epsilon, delta = q.epsilon_delta() # TODO replace by pure_delta()
if delta != 0:
deltas.append(
EpsilonDelta(
query=str(q.uuid),
epsilon=epsilon,
delta=delta,
)
)
session.add_all(deltas)
def register(
self, query: Union[PrivateQuery, Collection[PrivateQuery]]
) -> RDPPrivacyAccountant:
if not self.session():
raise ValueError('Cannot use this function without a Session !')
self._register(query)
self._register_rdp(query)
self._register_pure_delta(query)
self.session().commit() # type: ignore[union-attr]
return self
def verify_register(
self,
limit: Optional[PrivacyLimit],
query: Union[PrivateQuery, Collection[PrivateQuery]],
filter: Optional[PrivateQueryFilter] = None,
) -> bool:
"""Atomically verify if a query fits in the limit and record it in \
a queries"""
if not self.session():
raise ValueError('Cannot use this function without a Session !')
self.session().begin_nested() # type: ignore[union-attr]
self._register(query)
if self.verify_new_query(query, limit, filter):
self._register_rdp(query)
self._register_pure_delta(query)
self.session().commit() # type: ignore[union-attr]
return True
self.session().rollback() # type: ignore[union-attr]
return False
def verify_register_multiple(
self,
constraints: List[
Tuple[Optional[PrivacyLimit], Optional[PrivateQueryFilter]]
],
query: Union[PrivateQuery, Collection[PrivateQuery]],
) -> bool:
if not self.session():
raise ValueError('Cannot use this function without a Session !')
self.session().begin_nested() # type: ignore[union-attr]
self._register(query)
if all(
[
self.verify_new_query(query, limit, filter)
for (limit, filter) in constraints
]
):
self._register_rdp(query)
self._register_pure_delta(query)
self.session().commit() # type: ignore[union-attr]
return True
self.session().rollback() # type: ignore[union-attr]
return False
def _update(
self,
status: PrivateQueryStatus,
queries: Collection[PrivateQuery],
) -> RDPPrivacyAccountant:
"""Update the status of the queries in `queries` and all their
subqueries.
This one does NOT commit to the DB; use update() if you want to."""
session = self.session()
if not session:
raise ValueError('Cannot use this function without a Session !')
queries_and_subqueries: List[PrivateQuery] = list()
for q in queries:
if isinstance(q, ComposedPrivateQuery):
queries_and_subqueries += q.all_subqueries()
else:
queries_and_subqueries.append(q)
uuid_set = {str(q.uuid) for q in queries_and_subqueries}
for query in queries_and_subqueries:
query.status = status
nb_updated_rows = cast(
Query,
session.query(SQLPrivateQuery).filter(
SQLPrivateQuery.uuid.in_(uuid_set)
),
).update({'status': status}, synchronize_session='fetch')
# remove PrivacyProfileSQL if it contains an updated query
if nb_updated_rows:
subquery = session.query(func.min(SQLPrivateQuery.id)).filter(
SQLPrivateQuery.uuid.in_(uuid_set)
)
cast(
Query,
session.query(PrivacyProfileSQL).filter(
PrivacyProfileSQL.max_id >= subquery
),
).delete(synchronize_session='fetch')
return self
def update(
self,
status: PrivateQueryStatus,
queries: Collection[PrivateQuery],
) -> RDPPrivacyAccountant:
"""Update the status of the queries in `queries` and all their
subqueries."""
session = self.session()
if not session:
raise ValueError('Cannot use this function without a Session !')
self._update(status, queries)
session.commit()
return self
def _epsilon_from_rdp(self, rdp: List[float], delta: float) -> float:
if all(v == 0 for v in rdp):
return 0
return cast(
float,
analysis.get_privacy_spent(self.alphas, rdp, target_delta=delta)[
0
],
)
def _delta_from_rdp(self, rdp: List[float], epsilon: float) -> float:
if all(v == 0 for v in rdp):
return 0
return cast(
float,
analysis.get_privacy_spent(self.alphas, rdp, target_eps=epsilon)[
1
],
)
def epsilon(
self, delta: float, filter: Optional[PrivateQueryFilter] = None
) -> float:
not_rdp_delta = self.pure_delta(filter)
if not_rdp_delta >= delta:
raise ValueError(
"Delta from EpsilonDeltaQueries is larger than \
total target delta"
)
return self._epsilon_from_rdp(self.rdp(filter), delta - not_rdp_delta)
def delta(
self, epsilon: float, filter: Optional[PrivateQueryFilter] = None
) -> float:
not_rdp_delta = self.pure_delta(filter)
rdp_delta = self._delta_from_rdp(self.rdp(filter), epsilon)
return rdp_delta + not_rdp_delta
def rdp(self, filter: Optional[PrivateQueryFilter] = None) -> List[float]:
"""computes rdp of the current (filtered) queries.
Only top-level queries are kept so as to not count subqueries twice"""
new_filter = self._keep_top_queries(filter)
# if profile, load it and filter queries
profile = self.profile(filter)
if profile:
rdps = profile.rdp
new_condition = new_filter.filter_condition.copy()
new_condition.append(("id", "gt", profile.max_id))
new_filter = PrivateQueryFilter(new_condition)
else:
rdps = [0 for alpha in self.alphas]
rdps_from_table = self._rdp_from_table(new_filter, self.alphas)
return [
rdp + rdp_from_table
for rdp_from_table, rdp in zip(rdps_from_table, rdps)
]
def _rdp_from_table(
self, filter: PrivateQueryFilter, alphas: Sequence[float]
) -> List[float]:
"""Returns the rdp contained in the rdps table (see RDP class)"""
session = self.session()
if not session:
raise ValueError('Cannot use this function without a Session !')
filters = self._filter_sql_queries(filter)
rdp_alpha = (
session.query(func.sum(RDP.rdp), RDP.alpha)
.join(SQLPrivateQuery)
.filter(and_(*filters))
.filter(RDP.alpha.in_(alphas))
.group_by(RDP.alpha)
.all()
)
if len(rdp_alpha) == 0:
rdp = [0.0 for _ in alphas]
else:
rdp = [
rdp
for rdp, alpha in sorted(
cast(Iterable[Tuple[float, float]], rdp_alpha),
key=lambda x: x[1],
)
]
return rdp
def epsilon_delta(
self, filter: Optional[PrivateQueryFilter] = None
) -> Tuple[float, float]:
"""Take into account (epsilon, delta)-DP mechanisms that cannot be
translated to RDP.
This returns the total (epsilon, delta) pair of those mechanisms"""
warn(
'This is deprecated. Please use pure_delta() for non-rdp privacy '
'consumption.',
DeprecationWarning,
stacklevel=2,
)
new_filter = self._keep_top_queries(filter)
# if profile, load it and filter queries
profile = self.profile(filter)
if profile:
delta_profile = profile.delta
new_condition = new_filter.filter_condition.copy()
new_condition.append(("id", "gt", profile.max_id))
new_filter = PrivateQueryFilter(new_condition)
else:
delta_profile = 0.0
# we use the simple composition for now
epsilon_table, delta_table = self._epsilon_delta_from_table(new_filter)
total_epsilon = epsilon_table
total_delta = delta_profile + delta_table
return total_epsilon, total_delta
def _epsilon_delta_from_table(
self, filter: PrivateQueryFilter
) -> Tuple[float, float]:
"""Returns the (epsilon, delta) contained in the EpsilonDelta table"""
if not self.session():
raise ValueError('Cannot use this function without a Session !')
filters = self._filter_sql_queries(filter)
epsilon = (
self.session() # type: ignore[union-attr]
.query(func.sum(EpsilonDelta.epsilon))
.join(SQLPrivateQuery)
.filter(and_(*filters))
.scalar()
)
delta = (
self.session() # type: ignore[union-attr]
.query(func.sum(EpsilonDelta.delta))
.join(SQLPrivateQuery)
.filter(and_(*filters))
.scalar()
)
if epsilon is None:
return 0, 0
return epsilon, delta
def pure_delta(self, filter: Optional[PrivateQueryFilter] = None) -> float:
"""Take into account (epsilon, delta)-DP mechanisms that cannot be
translated to RDP.
This returns the total delta of those mechanisms"""
new_filter = self._keep_top_queries(filter)
# if profile, load it and filter queries
profile = self.profile(filter)
delta_profile: float
if profile:
delta_profile = profile.delta
new_condition = new_filter.filter_condition.copy()
new_condition.append(("id", "gt", profile.max_id))
new_filter = PrivateQueryFilter(new_condition)
else:
delta_profile = 0.0
# we use the simple composition for now
delta_table: float = self._pure_delta_from_table(new_filter)
return delta_profile + delta_table
def _pure_delta_from_table(self, filter: PrivateQueryFilter) -> float:
"""Returns the delta contained in the EpsilonDelta table"""
if not self.session():
raise ValueError('Cannot use this function without a Session !')
filters = self._filter_sql_queries(filter)
delta = (
self.session() # type: ignore[union-attr]
.query(func.sum(EpsilonDelta.delta))
.join(SQLPrivateQuery)
.filter(and_(*filters))
.scalar()
)
if delta is None:
return 0
return cast(float, delta)
def _keep_top_queries(
self, filter: Optional[PrivateQueryFilter]
) -> PrivateQueryFilter:
"""Remove subqueries for rdp and epsilon-delta computations"""
if filter is None:
new_filter = PrivateQueryFilter([("supquery", "eq", "null")])
elif "supquery" not in [row[0] for row in filter.filter_condition]:
new_condition = filter.filter_condition.copy()
new_condition.append(("supquery", "eq", "null"))
new_filter = PrivateQueryFilter(new_condition)
else:
new_filter = filter
return new_filter
def epsilon_new_query(
self,
delta: float,
query: Union[PrivateQuery, Collection[PrivateQuery]],
filter: Optional[PrivateQueryFilter] = None,
) -> float:
"""Epsilon of accountant + query 'query'"""
not_rdp_delta = self.pure_delta_new_query(query, filter)
if not_rdp_delta >= delta:
return cast(float, np.inf)
return self._epsilon_from_rdp(
self.rdp_new_query(query, filter), delta - not_rdp_delta
)
def delta_new_query(
self,
epsilon: float,
query: Union[PrivateQuery, Collection[PrivateQuery]],
filter: Optional[PrivateQueryFilter] = None,
) -> float:
"""Delta of accountant + query 'query'"""
not_rdp_delta = self.pure_delta_new_query(query, filter)
rdp_delta = self._delta_from_rdp(
self.rdp_new_query(query, filter), epsilon
)
return rdp_delta + not_rdp_delta
def rdp_new_query(
self,
query: Union[PrivateQuery, Collection[PrivateQuery]],
filter: Optional[PrivateQueryFilter] = None,
) -> List[float]:
"""RDP of accountant + query 'query'"""
if isinstance(query, PrivateQuery):
queries: Collection[PrivateQuery] = {query}
else:
queries = {q for q in query if q.depth == 0}
new_filter = self._add_uuid_to_filter(filter, queries)
return [
rdp + sum([cast(float, q.rdp(alpha)) for q in queries])
for alpha, rdp in zip(self.alphas, self.rdp(new_filter))
]
def epsilon_delta_new_query(
self,
query: Union[PrivateQuery, Collection[PrivateQuery]],
filter: Optional[PrivateQueryFilter] = None,
) -> Tuple[float, float]:
"""(Epsilon, delta) of accountant + query 'query'"""
warn(
'This is deprecated. Please use pure_delta_new_query() for '
'non-rdp privacy consumption.',
DeprecationWarning,
stacklevel=2,
)
if isinstance(query, PrivateQuery):
queries: Collection[PrivateQuery] = {query}
else:
queries = {q for q in query if q.depth == 0}
new_filter = self._add_uuid_to_filter(filter, queries)
eps, delta = self.epsilon_delta(new_filter)
new_eps_delta = [q.epsilon_delta() for q in queries]
new_eps = sum([new_ed[0] for new_ed in new_eps_delta])
new_delta = sum([new_ed[1] for new_ed in new_eps_delta])
return eps + new_eps, delta + new_delta
def pure_delta_new_query(
self,
query: Union[PrivateQuery, Collection[PrivateQuery]],
filter: Optional[PrivateQueryFilter] = None,
) -> float:
"""Delta of accountant + query 'query'"""
if isinstance(query, PrivateQuery):
queries: Collection[PrivateQuery] = {query}
else:
queries = {q for q in query if q.depth == 0}
new_filter = self._add_uuid_to_filter(filter, queries)
delta = self.pure_delta(new_filter)
new_delta = sum([q.pure_delta() for q in queries])
return delta + new_delta
def _add_uuid_to_filter(
self,
filter: Optional[PrivateQueryFilter],
queries: Collection[PrivateQuery],
) -> PrivateQueryFilter:
# retrieve all queries & subqueries
all_queries: Set[PrivateQuery] = set()
for query in queries:
if isinstance(query, ComposedPrivateQuery):
all_queries.update(query.all_subqueries())
else:
all_queries.add(query)
if filter is None:
new_filter = PrivateQueryFilter(
[("uuid", "not in", [str(q.uuid) for q in all_queries])]
)
else:
new_condition = filter.filter_condition.copy()
new_condition.append(
("uuid", "not in", [str(q.uuid) for q in all_queries])
)
new_filter = PrivateQueryFilter(new_condition)
return new_filter
def epsilon_query(
self,
delta: float,
query: Union[PrivateQuery, Collection[PrivateQuery]],
) -> float:
"""Epsilon of single query 'query'"""
not_rdp_delta = self.pure_delta_query(query)
if not_rdp_delta >= delta:
return cast(float, np.inf)
return self._epsilon_from_rdp(
self.rdp_query(query), delta - not_rdp_delta
)
def delta_query(
self,
epsilon: float,
query: Union[PrivateQuery, Collection[PrivateQuery]],
) -> float:
"""Delta of single query 'query'"""
not_rdp_delta = self.pure_delta_query(query)
rdp_delta = self._delta_from_rdp(self.rdp_query(query), epsilon)
return rdp_delta + not_rdp_delta
def rdp_query(
self, query: Union[PrivateQuery, Collection[PrivateQuery]]
) -> List[float]:
"""RDP of single query 'query'"""
if isinstance(query, PrivateQuery):
queries: Collection[PrivateQuery] = {query}
else:
queries = {q for q in query if q.depth == 0}
return [
sum([q.rdp(alpha) for q in queries]) # type: ignore[misc]
for alpha in self.alphas
]
def epsilon_delta_query(
self, query: Union[PrivateQuery, Collection[PrivateQuery]]
) -> Tuple[float, float]:
"""(Epsilon, delta) of query 'query'"""
warn(
'This is deprecated. Please use pure_delta_query() for non-rdp '
'privacy consumption.',
DeprecationWarning,
stacklevel=2,
)
if isinstance(query, PrivateQuery):
queries: Collection[PrivateQuery] = {query}
else:
queries = {q for q in query if q.depth == 0}
eps_delta = [q.epsilon_delta() for q in queries]
eps = sum([new_ed[0] for new_ed in eps_delta])
delta = sum([new_ed[1] for new_ed in eps_delta])
return eps, delta
def pure_delta_query(
self, query: Union[PrivateQuery, Collection[PrivateQuery]]
) -> float:
"""Delta of query 'query'"""
if isinstance(query, PrivateQuery):
queries: Collection[PrivateQuery] = {query}
else:
queries = {q for q in query if q.depth == 0}
return sum([q.pure_delta() for q in queries])
class RDP(Base): # pylint: disable=too-few-public-methods
# see https://github.com/PyCQA/pylint/issues/3525
"""Stores rdp as (id, query, alpha, rdp)
This way, the rdp is not recomputed each time"""
__tablename__ = "rdp"
id = mapped_column(Integer, primary_key=True)
query = mapped_column(String, ForeignKey("query.uuid", onupdate="cascade"))
alpha = mapped_column(Float)
rdp = mapped_column(Float)
__table_args__ = (Index("idx_query_alpha", "query", "alpha"),)
class EpsilonDelta(Base): # pylint: disable=too-few-public-methods
""" "Stores non-rdp (epsilon, delta) queries"""
__tablename__ = "epsilon_delta"
id = mapped_column(Integer, primary_key=True)
query = mapped_column(String, ForeignKey("query.uuid", onupdate="cascade"))
epsilon = mapped_column(Float)
delta = mapped_column(Float)