Repository URL to install this package:
|
Version:
1.0.1 ▾
|
# pylint: disable=redefined-builtin, too-many-lines, too-few-public-methods
from __future__ import annotations
import enum
import math
from typing import Iterable, List, Optional, Sequence, Tuple, Union, cast
from uuid import UUID, uuid4
from sqlalchemy import Enum, ForeignKey, Integer, String
from sqlalchemy.orm import Query, backref, mapped_column, relationship
from sqlalchemy.orm.session import Session
from sarus_differential_privacy import analysis
from sarus_differential_privacy.protobuf.private_query_pb2 import (
PrivateQuery as ProtoPrivateQuery,
)
from sarus_differential_privacy.sample import Sample
from sarus_differential_privacy.utils import Base, Json
# pylint: disable=invalid-name
class PrivateQueryStatus(enum.Enum):
"""Enum of Private queries' statuses
Queries are inactive before being registered and are executed then."""
inactive = 0
executed = 1
waived = 2
# pylint: disable=invalid-name
class PrivateQueryType(enum.Enum):
"""Enum of Private queries' types."""
base_private_query = 0
composed_query = 1
sampled_query = 2
gaussian_query = 3
laplace_query = 4
randomized_response_query = 5
epsilon_query = 6
sgm_query = 7
epsilon_delta_query = 8
multiple_query = 9
class SQLPrivateQuery(Base):
"""SQL equivalent of BasePrivateQuery."""
__tablename__ = "query"
__table_args__ = {"extend_existing": True}
id = mapped_column(Integer, primary_key=True)
uuid = mapped_column(String, unique=True)
type = mapped_column(Enum(PrivateQueryType))
status = mapped_column(Enum(PrivateQueryStatus))
parameters = mapped_column(Json(128))
rdp_id = relationship("RDP")
supquery_id = mapped_column(Integer, ForeignKey(id, onupdate="cascade"))
subqueries = relationship(
"SQLPrivateQuery",
cascade="all, delete-orphan",
backref=backref("supquery", remote_side=id),
collection_class=list,
enable_typechecks=False,
)
sample = relationship("SQLSample")
sample_id = mapped_column(Integer, ForeignKey("sample.id"))
__mapper_args__ = {
'polymorphic_on': type,
}
def private_query(self) -> BasePrivateQuery:
"""Return the corresponding BasePrivateQuery object"""
if self.type == PrivateQueryType.composed_query:
query: BasePrivateQuery = ComposedQuery(
[q.private_query() for q in self.subqueries]
)
if self.type == PrivateQueryType.sampled_query:
query = SampledQuery(
list(self.subqueries)[0].private_query(),
self.sample.sample(),
)
if self.type == PrivateQueryType.gaussian_query:
query = GaussianQuery( # pylint: disable=redefined-variable-type
**self.parameters
)
if self.type == PrivateQueryType.laplace_query:
query = LaplaceQuery(**self.parameters)
if self.type == PrivateQueryType.randomized_response_query:
query = RandomizedResponseQuery(**self.parameters)
if self.type == PrivateQueryType.epsilon_query:
query = EpsilonQuery(**self.parameters)
if self.type == PrivateQueryType.sgm_query:
query = SampledGaussianMechanismQuery(**self.parameters)
if self.type == PrivateQueryType.epsilon_delta_query:
query = EpsilonDeltaQuery(**self.parameters)
if self.type == PrivateQueryType.multiple_query:
query = MultipleQuery(
list(self.subqueries)[0].private_query(), **self.parameters
)
query._uuid = UUID(self.uuid) # pylint: disable=protected-access
query.status = self.status
return query
# pylint: disable=unnecessary-pass
class PrivateQuery:
"""Abstract PrivateQuery object"""
@property
def type(self) -> PrivateQueryType:
"""Type of query: gaussian, laplace, composed..."""
pass
@property
def status(self) -> PrivateQueryStatus:
"""Status of the query: inactive, executed or waived"""
pass
@status.setter
def status(self, value: PrivateQueryStatus) -> None:
pass
@property
def parameters(self) -> Optional[dict]:
"""Parameters of the query. None for sampled and composed query"""
pass
@property
def uuid(self) -> UUID:
"""Unique identifier of the query"""
pass
@property
def depth(self) -> int:
"""Number of supqueries between self and root node"""
pass
@depth.setter
def depth(self, value: int) -> None:
pass
def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
"""Returns the SQLPrivateQuery equivalent of the PrivateQuery"""
pass
def protobuf(self) -> ProtoPrivateQuery:
"""Returns the ProtoPrivateQuery equivalent of the PrivateQuery"""
pass
def rdp(
self, alpha: Union[float, List[float]]
) -> Union[float, List[float]]:
"""Function implementing the RDP consumption of the query for a given
alpha or a list of alphas"""
pass
def epsilon_delta(self) -> Tuple[float, float]:
"""Returns non-rdp (epsilon, delta) pair.
By default, it is (0, 0). To be overwritten"""
pass
def pure_delta(self) -> float:
"""Returns the delta part of an EpsilonDeltaQuery"""
pass
class ComposedPrivateQuery(PrivateQuery):
"""Abstract composed private query"""
@property
def subqueries(self) -> Sequence[PrivateQuery]:
"""Subqueries for composed, sampled and multiple queries"""
pass
def all_subqueries(self) -> Sequence[PrivateQuery]:
"""Get all descendants of self including itself in a set"""
pass
class SampledPrivateQuery(ComposedPrivateQuery):
"""Abstract sampled private query"""
@property
def sample(self) -> Sample:
"""Subqueries for composed, sampled and multiple queries"""
pass
class BasePrivateQuery(PrivateQuery):
"""Base class for private queries.
Implements the static methods"""
def __init__(self) -> None:
self._type = PrivateQueryType.base_private_query
self._status = PrivateQueryStatus.inactive
self._parameters: Optional[dict] = None
self._uuid = uuid4()
self.depth = 0
self._rdp: dict[float, float] = {}
@property
def type(self) -> PrivateQueryType:
return self._type
@property
def status(self) -> PrivateQueryStatus:
return self._status
@status.setter
def status(self, value: PrivateQueryStatus) -> None:
self._status = value
@property
def parameters(self) -> Optional[dict]:
return self._parameters
@property
def uuid(self) -> UUID:
return self._uuid
@property
def depth(self) -> int:
return self._depth
@depth.setter
def depth(self, value: int) -> None:
self._depth = value
def epsilon_delta(self) -> Tuple[float, float]:
"""Returns non-rdp (epsilon, delta) pair.
By default, it is (0, 0). To be overwritten"""
return (0, 0)
def pure_delta(self) -> float:
return 0
@staticmethod
def from_sql(sql_object: SQLPrivateQuery) -> PrivateQuery:
"""Build PrivateQuery from SQLPrivateQuery object"""
return sql_object.private_query()
# pylint: disable=too-many-branches
@staticmethod
def from_protobuf(
protobuf: Sequence[ProtoPrivateQuery],
) -> List['BasePrivateQuery']:
"""Build a list of PrivateQuery from a sequence of ProtoPrivateQuery
objects"""
queries = {}
for q_proto in protobuf:
if q_proto.type == PrivateQueryType.composed_query.name:
query: BasePrivateQuery = ComposedQuery([])
elif q_proto.type == PrivateQueryType.sampled_query.name:
query = SampledQuery(
None, Sample.from_protobuf(q_proto.sample)
)
elif q_proto.type == PrivateQueryType.gaussian_query.name:
query = GaussianQuery(q_proto.parameters['noise'])
elif q_proto.type == PrivateQueryType.laplace_query.name:
query = LaplaceQuery(q_proto.parameters['noise'])
elif (
q_proto.type == PrivateQueryType.randomized_response_query.name
):
query = RandomizedResponseQuery(
q_proto.parameters['probability']
)
elif q_proto.type == PrivateQueryType.epsilon_query.name:
query = EpsilonQuery(q_proto.parameters['epsilon'])
elif q_proto.type == PrivateQueryType.sgm_query.name:
query = SampledGaussianMechanismQuery(
q_proto.parameters['sampling_probability'],
q_proto.parameters['noise_multiplier'],
int(q_proto.parameters['steps']),
)
elif q_proto.type == PrivateQueryType.epsilon_delta_query.name:
query = EpsilonDeltaQuery(
q_proto.parameters['epsilon'], q_proto.parameters['delta']
)
elif q_proto.type == PrivateQueryType.multiple_query.name:
query = MultipleQuery(
None, int(q_proto.parameters['multiplicity'])
)
else:
raise ValueError(f'type not recognized: {q_proto.type}')
query._uuid = UUID( # pylint: disable=protected-access
q_proto.uuid
)
query.status = PrivateQueryStatus[q_proto.status]
queries[q_proto.uuid] = (query, q_proto)
for q, q_proto in queries.values():
if q_proto.subqueries:
# pylint: disable=protected-access
q._subqueries += [ # type: ignore[attr-defined]
queries[subq][0] for subq in q_proto.subqueries
]
subqueries = q.all_subqueries() # type: ignore[attr-defined]
subqueries.remove(q)
for subq in subqueries:
subq.depth += 1
return [q[0] for q in queries.values()]
@staticmethod
def to_protobuf(
queries: Iterable[PrivateQuery],
) -> List[ProtoPrivateQuery]:
"""Convert a sequence of queries to a list of ProtoPrivateQuery
If composed or sampled queries, all descendants must be in queries"""
return [q.protobuf() for q in queries]
class BaseComposedPrivateQuery(BasePrivateQuery, ComposedPrivateQuery):
"""Base class for ComposedPrivateQuery"""
def __init__(self) -> None:
super().__init__()
self._subqueries: List[PrivateQuery] = []
@property
def subqueries(self) -> List[PrivateQuery]:
return self._subqueries
def all_subqueries(self) -> List[PrivateQuery]:
descendants: List[PrivateQuery] = [self]
for q in self.subqueries:
if isinstance(q, ComposedPrivateQuery):
descendants += q.all_subqueries()
else:
descendants.append(q)
return descendants
def remove_duplicates_preserve_order(lst: List) -> List:
"""equivalent of list(set()) but preserves order
given in the input list"""
seen = set()
new_list = []
for el in lst:
if el not in seen:
new_list.append(el)
seen.add(el)
return new_list
class ComposedQuery(BaseComposedPrivateQuery):
"""A sequence of private queries"""
def __init__(
self,
queries: List[PrivateQuery],
) -> None:
super().__init__()
self._type = PrivateQueryType.composed_query
for q in queries:
if isinstance(q, ComposedPrivateQuery):
for subq in q.all_subqueries():
subq.depth += 1
else:
q.depth += 1
self._subqueries.append(q)
self._subqueries = remove_duplicates_preserve_order(self._subqueries)
def rdp(
self, alpha: Union[float, List[float]]
) -> Union[float, List[float]]:
if not isinstance(alpha, list):
if alpha in self._rdp:
return self._rdp[alpha]
rdp_alpha = sum(
[cast(float, q.rdp(alpha)) for q in self.subqueries]
)
self._rdp[alpha] = rdp_alpha
return rdp_alpha
return [cast(float, self.rdp(a)) for a in alpha]
def epsilon_delta(self) -> Tuple[float, float]:
list_epsilon_delta = [q.epsilon_delta() for q in self.subqueries]
epsilon = sum(
[q_epsilon_delta[0] for q_epsilon_delta in list_epsilon_delta]
)
delta = sum(
[q_epsilon_delta[1] for q_epsilon_delta in list_epsilon_delta]
)
return (epsilon, delta)
def pure_delta(self) -> float:
return sum([q.pure_delta() for q in self.subqueries])
def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
if session:
existing_query = cast(
Query,
session.query(SQLPrivateQuery).where(
SQLPrivateQuery.uuid == str(self.uuid)
),
).one_or_none()
if existing_query:
return cast(SQLPrivateQuery, existing_query)
sql_query = SQLPrivateQuery(
type=PrivateQueryType.composed_query,
uuid=str(self.uuid),
status=self.status,
subqueries=[q.sql(session) for q in self.subqueries],
)
if session:
session.add(sql_query)
return sql_query
def protobuf(self) -> ProtoPrivateQuery:
return ProtoPrivateQuery(
uuid=str(self.uuid),
type=self.type.name,
status=self.status.name,
subqueries=[str(subq.uuid) for subq in self.subqueries],
)
class SampledQuery(BaseComposedPrivateQuery, SampledPrivateQuery):
"""Apply a sampling `sample` on the query `query`"""
def __init__(
self,
query: Optional[PrivateQuery],
sample: Sample,
) -> None:
super().__init__()
self._type = PrivateQueryType.sampled_query
self._sample = sample
if query:
if isinstance(query, ComposedPrivateQuery):
for subq in query.all_subqueries():
subq.depth += 1
else:
query.depth += 1
self._subqueries.append(query)
@property
def sample(self) -> Sample:
return self._sample
def rdp(
self, alpha: Union[float, List[float]]
) -> Union[float, List[float]]:
sub_q = list(self.subqueries)[0]
if not isinstance(alpha, list):
if alpha in self._rdp:
return self._rdp[alpha]
rdp_alpha = self.sample.rdp(alpha, sub_q)
self._rdp[alpha] = rdp_alpha
return rdp_alpha
return [cast(float, self.rdp(a)) for a in alpha]
def epsilon_delta(self) -> Tuple[float, float]:
sub_q = list(self.subqueries)[0]
return self.sample.epsilon_delta(sub_q)
def pure_delta(self) -> float:
sub_q = list(self.subqueries)[0]
return self.sample.pure_delta(sub_q)
def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
if session:
existing_query = cast(
Query,
session.query(SQLPrivateQuery).where(
SQLPrivateQuery.uuid == str(self.uuid)
),
).one_or_none()
if existing_query:
return cast(SQLPrivateQuery, existing_query)
sql_query = SQLPrivateQuery(
type=PrivateQueryType.sampled_query,
uuid=str(self.uuid),
status=self.status,
subqueries=[q.sql(session) for q in self.subqueries],
sample=self.sample.sql(session),
)
if session:
session.add(sql_query)
return sql_query
def protobuf(self) -> ProtoPrivateQuery:
return ProtoPrivateQuery(
uuid=str(self.uuid),
type=self.type.name,
status=self.status.name,
sample=self.sample.protobuf(),
subqueries=[str(subq.uuid) for subq in self.subqueries],
)
class GaussianQuery(BasePrivateQuery):
"""Gaussian query"""
def __init__(self, noise: float):
super().__init__()
self._type = PrivateQueryType.gaussian_query
self._parameters: dict = {"noise": noise}
@property
def parameters(self) -> dict:
return self._parameters
def rdp(
self, alpha: Union[float, List[float]]
) -> Union[float, List[float]]:
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
if not isinstance(alpha, list):
if alpha in self._rdp:
return self._rdp[alpha]
rdp_alpha = alpha / (2 * (self.parameters["noise"] ** 2))
self._rdp[alpha] = rdp_alpha
return rdp_alpha # type: ignore[no-any-return]
return [cast(float, self.rdp(a)) for a in alpha]
def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
if session:
existing_query = cast(
Query,
session.query(SQLPrivateQuery).where(
SQLPrivateQuery.uuid == str(self.uuid)
),
).one_or_none()
if existing_query:
return cast(SQLPrivateQuery, existing_query)
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
sql_query = SQLPrivateQuery(
type=PrivateQueryType.gaussian_query,
uuid=str(self.uuid),
status=self.status,
parameters=self.parameters,
)
if session:
session.add(sql_query)
return sql_query
def protobuf(self) -> ProtoPrivateQuery:
return ProtoPrivateQuery(
uuid=str(self.uuid),
type=self.type.name,
status=self.status.name,
parameters=self.parameters,
)
class LaplaceQuery(BasePrivateQuery):
"""Laplace query"""
def __init__(self, noise: float):
super().__init__()
self._type = PrivateQueryType.laplace_query
self._parameters: dict = {"noise": noise}
@property
def parameters(self) -> dict:
return self._parameters
def rdp(
self, alpha: Union[float, List[float]]
) -> Union[float, List[float]]:
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
n: float = self.parameters["noise"]
if not isinstance(alpha, list):
if alpha in self._rdp:
return self._rdp[alpha]
if alpha == 1:
rdp_alpha = max(0, 1 / n + math.exp(-1 / n) - 1)
else:
try:
rdp_alpha = max(
0,
1.0 / n
+ math.log(
(
alpha
+ (alpha - 1) * math.exp(-(2 * alpha - 1) / n)
)
/ (2 * alpha - 1)
)
/ (alpha - 1),
)
except OverflowError: # if exp too high
rdp_alpha = float("inf")
self._rdp[alpha] = rdp_alpha
return rdp_alpha
return [cast(float, self.rdp(a)) for a in alpha]
def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
if session:
existing_query = cast(
Query,
session.query(SQLPrivateQuery).where(
SQLPrivateQuery.uuid == str(self.uuid)
),
).one_or_none()
if existing_query:
return cast(SQLPrivateQuery, existing_query)
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
sql_query = SQLPrivateQuery(
type=PrivateQueryType.laplace_query,
uuid=str(self.uuid),
status=self.status,
parameters=self.parameters,
)
if session:
session.add(sql_query)
return sql_query
def protobuf(self) -> ProtoPrivateQuery:
return ProtoPrivateQuery(
uuid=str(self.uuid),
type=self.type.name,
status=self.status.name,
parameters=self.parameters,
)
class RandomizedResponseQuery(BasePrivateQuery):
"""Randomized response query"""
def __init__(self, probability: float):
super().__init__()
self._type = PrivateQueryType.randomized_response_query
self._parameters: dict = {"probability": probability}
@property
def parameters(self) -> dict:
return self._parameters
def rdp(
self, alpha: Union[float, List[float]]
) -> Union[float, List[float]]:
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
p = self.parameters["probability"]
if not isinstance(alpha, list):
if alpha in self._rdp:
return self._rdp[alpha]
if alpha == 1:
rdp_alpha = (2 * p - 1) * math.log(p / (1 - p))
else:
rdp_alpha = (
1
/ (alpha - 1)
* math.log(
math.pow(p, alpha) * math.pow(1 - p, 1 - alpha)
+ math.pow(1 - p, alpha) * math.pow(p, 1 - alpha)
)
)
self._rdp[alpha] = rdp_alpha
return rdp_alpha # type: ignore[no-any-return]
return [cast(float, self.rdp(a)) for a in alpha]
def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
if session:
existing_query = cast(
Query,
session.query(SQLPrivateQuery).where(
SQLPrivateQuery.uuid == str(self.uuid)
),
).one_or_none()
if existing_query:
return cast(SQLPrivateQuery, existing_query)
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
sql_query = SQLPrivateQuery(
type=PrivateQueryType.randomized_response_query,
uuid=str(self.uuid),
status=self.status,
parameters=self.parameters,
)
if session:
session.add(sql_query)
return sql_query
def protobuf(self) -> ProtoPrivateQuery:
return ProtoPrivateQuery(
uuid=str(self.uuid),
type=self.type.name,
status=self.status.name,
parameters=self.parameters,
)
class EpsilonQuery(BasePrivateQuery):
"""Pure-DP query"""
def __init__(self, epsilon: float):
super().__init__()
self._type = PrivateQueryType.epsilon_query
self._parameters: dict = {"epsilon": epsilon}
@property
def parameters(self) -> dict:
return self._parameters
def rdp(
self, alpha: Union[float, List[float]]
) -> Union[float, List[float]]:
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
eps: float = self.parameters["epsilon"]
if not isinstance(alpha, list):
if alpha in self._rdp:
return self._rdp[alpha]
rdp_alpha = max(
0,
eps
+ (
math.log(
1
- math.exp(-2 * alpha * eps)
- math.exp(-eps)
+ math.exp(-(2 * alpha - 1) * eps)
)
- math.log(1 - math.exp(-2 * eps))
)
/ (alpha - 1),
)
self._rdp[alpha] = rdp_alpha
return rdp_alpha
return [cast(float, self.rdp(a)) for a in alpha]
def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
if session:
existing_query = cast(
Query,
session.query(SQLPrivateQuery).where(
SQLPrivateQuery.uuid == str(self.uuid)
),
).one_or_none()
if existing_query:
return cast(SQLPrivateQuery, existing_query)
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
sql_query = SQLPrivateQuery(
type=PrivateQueryType.epsilon_query,
uuid=str(self.uuid),
status=self.status,
parameters=self.parameters,
)
if session:
session.add(sql_query)
return sql_query
def protobuf(self) -> ProtoPrivateQuery:
return ProtoPrivateQuery(
uuid=str(self.uuid),
type=self.type.name,
status=self.status.name,
parameters=self.parameters,
)
class SampledGaussianMechanismQuery(BasePrivateQuery):
"""SGM query"""
def __init__(
self,
sampling_probability: float,
noise_multiplier: float,
steps: int,
) -> None:
super().__init__()
self._type = PrivateQueryType.sgm_query
self._parameters: dict = {
"sampling_probability": sampling_probability,
"noise_multiplier": noise_multiplier,
"steps": steps,
}
@property
def parameters(self) -> dict:
return self._parameters
# pylint: disable=protected-access
def rdp(
self, alpha: Union[float, List[float]]
) -> Union[float, List[float]]:
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
if not isinstance(alpha, list):
if alpha in self._rdp:
return self._rdp[alpha]
rdp_alpha = max(
0,
analysis._compute_rdp(
self.parameters["sampling_probability"],
self.parameters["noise_multiplier"],
alpha,
)
* self.parameters["steps"],
)
self._rdp[alpha] = rdp_alpha
return rdp_alpha # type: ignore[no-any-return]
return [cast(float, self.rdp(a)) for a in alpha]
def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
if session:
existing_query = cast(
Query,
session.query(SQLPrivateQuery).where(
SQLPrivateQuery.uuid == str(self.uuid)
),
).one_or_none()
if existing_query:
return cast(SQLPrivateQuery, existing_query)
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
sql_query = SQLPrivateQuery(
type=PrivateQueryType.sgm_query,
uuid=str(self.uuid),
status=self.status,
parameters=self.parameters,
)
if session:
session.add(sql_query)
return sql_query
def protobuf(self) -> ProtoPrivateQuery:
return ProtoPrivateQuery(
uuid=str(self.uuid),
type=self.type.name,
status=self.status.name,
parameters=self.parameters,
)
class EpsilonDeltaQuery(BasePrivateQuery):
"""Generic approximate-DP query"""
def __init__(
self,
epsilon: float,
delta: float,
):
super().__init__()
self._type = PrivateQueryType.epsilon_delta_query
self._parameters: dict = {
"epsilon": epsilon,
"delta": delta,
}
@property
def parameters(self) -> dict:
return self._parameters
def rdp(
self, alpha: Union[float, List[float]]
) -> Union[float, List[float]]:
pure_eps_query = EpsilonQuery(epsilon=self.parameters['epsilon'])
return pure_eps_query.rdp(alpha)
def epsilon_delta(self) -> Tuple[float, float]:
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
return (self.parameters["epsilon"], self.parameters["delta"])
def pure_delta(self) -> float:
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
return cast(float, self.parameters["delta"])
def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
if session:
existing_query = cast(
Query,
session.query(SQLPrivateQuery).where(
SQLPrivateQuery.uuid == str(self.uuid)
),
).one_or_none()
if existing_query:
return cast(SQLPrivateQuery, existing_query)
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
sql_query = SQLPrivateQuery(
type=PrivateQueryType.epsilon_delta_query,
uuid=str(self.uuid),
status=self.status,
parameters=self.parameters,
)
if session:
session.add(sql_query)
return sql_query
def protobuf(self) -> ProtoPrivateQuery:
return ProtoPrivateQuery(
uuid=str(self.uuid),
type=self.type.name,
status=self.status.name,
parameters=self.parameters,
)
class MultipleQuery(BaseComposedPrivateQuery):
"""Apply several times the same query"""
def __init__(
self,
query: Optional[PrivateQuery],
multiplicity: int,
):
super().__init__()
self._type = PrivateQueryType.multiple_query
self._parameters: dict = {
'multiplicity': multiplicity,
}
if query:
if isinstance(query, ComposedPrivateQuery):
for subq in query.all_subqueries():
subq.depth += 1
else:
query.depth += 1
self._subqueries.append(query)
@property
def parameters(self) -> dict:
return self._parameters
def rdp(
self, alpha: Union[float, List[float]]
) -> Union[float, List[float]]:
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
sub_q = list(self.subqueries)[0]
if not isinstance(alpha, list):
if alpha in self._rdp:
return self._rdp[alpha]
rdp_alpha = cast(
float, sub_q.rdp(alpha) * self.parameters['multiplicity']
)
self._rdp[alpha] = rdp_alpha
return rdp_alpha
return [cast(float, self.rdp(a)) for a in alpha]
def epsilon_delta(self) -> Tuple[float, float]:
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
eps, delta = list(self.subqueries)[0].epsilon_delta()
return (
eps * self.parameters['multiplicity'],
delta * self.parameters['multiplicity'],
)
def pure_delta(self) -> float:
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
delta = list(self.subqueries)[0].pure_delta()
return delta * cast(float, self.parameters['multiplicity'])
def sql(self, session: Optional[Session] = None) -> SQLPrivateQuery:
if session:
existing_query = cast(
Query,
session.query(SQLPrivateQuery).where(
SQLPrivateQuery.uuid == str(self.uuid)
),
).one_or_none()
if existing_query:
return cast(SQLPrivateQuery, existing_query)
if not self.parameters:
raise ValueError(
'self.parameters should not be set to None for this query type'
)
sql_query = SQLPrivateQuery(
type=PrivateQueryType.multiple_query,
uuid=str(self.uuid),
status=self.status,
subqueries=[q.sql(session) for q in self.subqueries],
parameters=self.parameters,
)
if session:
session.add(sql_query)
return sql_query
def protobuf(self) -> ProtoPrivateQuery:
return ProtoPrivateQuery(
uuid=str(self.uuid),
type=self.type.name,
status=self.status.name,
parameters=self.parameters,
subqueries=[str(subq.uuid) for subq in self.subqueries],
)