Repository URL to install this package:
|
Version:
2.7.2 ▾
|
# pylint: disable=too-many-arguments
"""Functions to retrieve optimal hyperparameters from a target epsilon by
bisection"""
from typing import Callable, Optional, Sequence, Tuple
import numpy as np
from sarus_differential_privacy.accountant import PrivacyAccountant
from sarus_differential_privacy.accountant_local import LocalPrivacyAccountant
from sarus_differential_privacy.query import (
PrivateQuery,
SampledGaussianMechanismQuery,
)
from scipy.optimize import bisect
def _check_arguments(
query_factory: Callable[[float], PrivateQuery],
target_epsilon: float,
target_delta: float,
accountant: Optional[PrivacyAccountant] = None,
alphas: Optional[Sequence[float]] = None,
min_scale: float = 1e-8,
) -> Tuple[PrivacyAccountant, Tuple[float, float]]:
"""Check if bisection arguments are ok.
Returns searchspace"""
searchspace = (min_scale, 1.0) # not zero to prevent ZeroDivisionErrors
assert target_epsilon > 0 and 0 <= target_delta <= 1
if accountant:
priv_acc = accountant
assert (
priv_acc.epsilon(target_delta) <= target_epsilon
), "epsilon is already above target_epsilon for this accountant"
else:
priv_acc = LocalPrivacyAccountant(alphas=alphas)
# check if target is in interval
min_epsilon = priv_acc.epsilon_new_query(
target_delta, query_factory(searchspace[0])
)
assert min_epsilon <= target_epsilon, (
"The smallest creatable query with this query_factory already "
"consumes too much privacy"
)
for _ in range(16):
max_epsilon = priv_acc.epsilon_new_query(
target_delta, query_factory(searchspace[1])
)
if max_epsilon >= target_epsilon:
break
else:
searchspace = (searchspace[1], searchspace[1] * 2)
return priv_acc, searchspace
def best_query(
query_factory: Callable[[float], PrivateQuery],
target_epsilon: float,
target_delta: float,
tolerance: float = 1e-1,
accountant: Optional[PrivacyAccountant] = None,
alphas: Optional[Sequence[float]] = None,
max_iter: int = 50,
min_scale: float = 1e-8,
) -> PrivateQuery:
"""Retrieve best noise parameter for target (epsilon, delta) by bisection
and return the fitting query.
Note: the returned query is a different object than the argument query,
with a different uuid.
Arguments:
query_factory: callable directing how to update query given epsilon scale.
It takes as argument a float between 0 and 1. Multiplying by the
maximum noise value is expected to be done inside the query_factory.
target_epsilon, target_delta: target privacy parameters
tolerance: returned epsilon will be between
[target_epsilon * (1 - tolerance), target_epsilon]
accountant: accountant in which query is to be added. Influence the value
of epsilon
max_iter: max number of iterations before returning
min_scale: minimum epsilon scale. To prevent usual ZeroDivisionErrors
from common `1 / scale_factor` computations.
:returns: best_query: query of the same type as query, updated with
query_factory
:rtype: PrivateQuery
:raises AssertionError: if:
- target_epsilon <= 0 and target_delta < 0 or > 1
- accountant.epsilon(target_delta) > target_epsilon before adding any
query,
- smallest query is already above target_epsilon
- query_factory is not increasing
- target_epsilon cannot be reach even after increasing the searchspace
"""
priv_acc, searchspace = _check_arguments(
query_factory,
target_epsilon,
target_delta,
accountant,
alphas,
min_scale,
)
# if max scale is still not enough, return the max
if (
priv_acc.epsilon_new_query(target_delta, query_factory(searchspace[0]))
>= target_epsilon
):
return query_factory(searchspace[1])
# run actual bisection
best_q = query_factory(
searchspace[0]
) # query returned if convergence is not reached
for _ in range(max_iter):
scale_factor = np.mean(searchspace)
try:
query = query_factory(scale_factor)
epsilon = priv_acc.epsilon_new_query(target_delta, query)
except Exception: # pylint: disable=broad-except
return best_q
if target_epsilon * (1 - tolerance) < epsilon <= target_epsilon:
return query
if epsilon > target_epsilon:
searchspace = (searchspace[0], scale_factor)
else:
searchspace = (scale_factor, searchspace[1])
best_q = query
return best_q
def best_query_scipy(
query_factory: Callable[[float], PrivateQuery],
target_epsilon: float,
target_delta: float,
tolerance: float = 1e-1,
accountant: Optional[PrivacyAccountant] = None,
alphas: Optional[Sequence[float]] = None,
max_iter: int = 50,
min_scale: float = 1e-8,
) -> PrivateQuery:
"""Retrieve best noise parameter for target (epsilon, delta) by bisection
using scipy.optimize.bisect and return the fitting query.
Note: the returned query is a different object than the argument query,
with a different uuid.
Warning: with this function, we cannot return the best query if an
exception is thrown by the query_factory.
Arguments:
query_factory: callable directing how to update query given epsilon scale.
It takes as argument a float between 0 and 1. Multiplying by the
maximum noise value is expected to be done inside the query_factory.
target_epsilon, target_delta: target privacy parameters
tolerance: returned epsilon will be between
[target_epsilon * (1 - tolerance), target_epsilon]
accountant: accountant in which query is to be added. Influence the value
of epsilon
max_iter: max number of iterations before returning
min_scale: minimum epsilon scale. To prevent usual ZeroDivisionErrors
from common `1 / scale_factor` computations.
:returns: best_query: query of the same type as query, updated with
query_factory
:rtype: PrivateQuery
:raises AssertionError: if:
- target_epsilon <= 0 and target_delta < 0 or > 1
- accountant.epsilon(target_delta) > target_epsilon before adding any
query,
- smallest query is already above target_epsilon
- query_factory is not increasing
- target_epsilon cannot be reach even after increasing the searchspace
"""
priv_acc, searchspace = _check_arguments(
query_factory,
target_epsilon,
target_delta,
accountant,
alphas,
min_scale,
)
# run actual bisection
xtol = target_epsilon * tolerance / 2
best_scale = bisect(
lambda scale: priv_acc.epsilon_new_query(
target_delta, query_factory(scale)
)
- target_epsilon
+ xtol,
searchspace[0],
searchspace[1],
maxiter=max_iter,
xtol=xtol * (1 - 4 * np.finfo(float).eps), # compensating for rtol
)
return query_factory(best_scale)
def best_epsilon(
query_factory: Callable[[float], PrivateQuery],
target_epsilon: float,
target_delta: float,
tolerance: float = 1e-1,
accountant: Optional[PrivacyAccountant] = None,
alphas: Optional[Sequence[float]] = None,
max_iter: int = 50,
min_scale: float = 1e-5,
) -> float:
"""Retrieve best epsilon_multiplier parameter for target (epsilon, delta)
by bisection.
Arguments:
query_factory: callable directing how to update query given current
epsilon_multiplier.
target_epsilon, target_delta: target privacy parameters
tolerance: returned epsilon will be between [target_epsilon - tolerance,
target_epsilon]
accountant: accountant in which query is to be added. Influence the value
of epsilon
max_iter: max number of iterations before returning
min_scale: minimum epsilon scale. To prevent usual ZeroDivisionErrors
from common `1 / eps_scale` computations.
:returns: best_scale: best fitting scale that is below target
:rtype: PrivateQuery
:raises AssertionError: if:
- target_epsilon <= 0 and target_delta < 0 or > 1
- accountant.epsilon(target_delta) > target_epsilon before adding any
query,
- smallest query is already above target_epsilon
- query_factory is not increasing
- target_epsilon cannot be reach even after increasing the searchspace
"""
priv_acc, searchspace = _check_arguments(
query_factory,
target_epsilon,
target_delta,
accountant,
alphas,
min_scale,
)
# if max scale is still not enough, return the max
if (
priv_acc.epsilon_new_query(target_delta, query_factory(searchspace[0]))
>= target_epsilon
):
return searchspace[1]
# run actual bisection
best_scale = min_scale # query returned if convergence is not reached
for _ in range(max_iter):
scale_factor: float = np.mean(searchspace)
try:
query = query_factory(scale_factor)
epsilon = priv_acc.epsilon_new_query(target_delta, query)
except Exception: # pylint: disable=broad-except
return best_scale
if target_epsilon * (1 - tolerance) < epsilon <= target_epsilon:
return scale_factor
if target_epsilon - tolerance < epsilon <= target_epsilon:
return scale_factor
if epsilon > target_epsilon:
searchspace = (searchspace[0], scale_factor)
else:
searchspace = (scale_factor, searchspace[1])
best_scale = scale_factor
return best_scale
def best_epsilon_scipy(
query_factory: Callable[[float], PrivateQuery],
target_epsilon: float,
target_delta: float,
tolerance: float = 1e-1,
accountant: Optional[PrivacyAccountant] = None,
alphas: Optional[Sequence[float]] = None,
max_iter: int = 50,
min_scale: float = 1e-8,
) -> float:
"""Retrieve best noise parameter for target (epsilon, delta) by bisection
using scipy.optimize.bisect and return the fitting query.
Note: the returned query is a different object than the argument query,
with a different uuid.
Warning: with this function, we cannot return the best scale if an
exception is thrown by the query_factory.
Arguments:
query_factory: callable directing how to update query given epsilon scale.
It takes as argument a float between 0 and 1. Multiplying by the
maximum noise value is expected to be done inside the query_factory.
target_epsilon, target_delta: target privacy parameters
tolerance: returned epsilon will be between
[target_epsilon * (1 - tolerance), target_epsilon]
accountant: accountant in which query is to be added. Influence the value
of epsilon
max_iter: max number of iterations before returning
min_scale: minimum epsilon scale. To prevent usual ZeroDivisionErrors
from common `1 / scale_factor` computations.
:returns: best_query: query of the same type as query, updated with
query_factory
:rtype: PrivateQuery
:raises AssertionError: if:
- target_epsilon <= 0 and target_delta < 0 or > 1
- accountant.epsilon(target_delta) > target_epsilon before adding any
query,
- smallest query is already above target_epsilon
- query_factory is not increasing
- target_epsilon cannot be reach even after increasing the searchspace
"""
priv_acc, searchspace = _check_arguments(
query_factory,
target_epsilon,
target_delta,
accountant,
alphas,
min_scale,
)
# run actual bisection
xtol = target_epsilon * tolerance / 2
best_scale: float = bisect(
lambda scale: priv_acc.epsilon_new_query(
target_delta, query_factory(scale)
)
- target_epsilon
+ xtol,
searchspace[0],
searchspace[1],
maxiter=max_iter,
xtol=xtol * (1 - 4 * np.finfo(float).eps), # compensating for rtol
)
return best_scale
def best_steps(
query: SampledGaussianMechanismQuery,
target_epsilon: Optional[float],
target_delta: float,
accountant: Optional[PrivacyAccountant] = None,
alphas: Optional[Sequence[float]] = None,
) -> int:
"""Get the max steps value for (target_epsilon, target_delta) by
bisection.
if accountant is given, it used to compute epsilon. Else, a
LocalPrivacyAccountant is created using alphas.
"""
assert (
0 < query.parameters['sampling_probability'] <= 1
), 'sampling_probability must be > 0 and <= 1'
assert (
query.parameters['noise_multiplier'] > 0
), 'noise_multiplier must be > 0'
priv_acc = (
accountant if accountant else LocalPrivacyAccountant(alphas=alphas)
)
if target_epsilon is None:
target_epsilon = 0.0
if priv_acc.epsilon(delta=target_delta) >= target_epsilon:
return 0
def get_epsilon(i: int) -> float:
query.parameters["steps"] = i
return priv_acc.epsilon_query(target_delta, query)
inf_value = 0
sup_value = 1
# find an upper bound
while get_epsilon(sup_value) < target_epsilon:
sup_value = sup_value * 2
# iterate to find the root between the upper and lower bounds
while sup_value - inf_value > 1:
mean_value = (inf_value + sup_value) // 2
if get_epsilon(mean_value) <= target_epsilon:
inf_value = mean_value
else:
sup_value = mean_value
return inf_value