import abc
import cmath
import collections.abc
import contextlib
import warnings
from typing import (
Any,
Callable,
Collection,
Dict,
List,
NoReturn,
Optional,
Sequence,
Tuple,
Type,
Union,
)
import torch
try:
import numpy as np
NUMPY_AVAILABLE = True
except ModuleNotFoundError:
NUMPY_AVAILABLE = False
class ErrorMeta(Exception):
"""Internal testing exception that makes that carries error metadata."""
def __init__(
self, type: Type[Exception], msg: str, *, id: Tuple[Any, ...] = ()
) -> None:
super().__init__(
"If you are a user and see this message during normal operation "
"please file an issue at https://github.com/pytorch/pytorch/issues. "
"If you are a developer and working on the comparison functions, please `raise ErrorMeta().to_error()` "
"for user facing errors."
)
self.type = type
self.msg = msg
self.id = id
def to_error(
self, msg: Optional[Union[str, Callable[[str], str]]] = None
) -> Exception:
if not isinstance(msg, str):
generated_msg = self.msg
if self.id:
generated_msg += f"\n\nThe failure occurred for item {''.join(str([item]) for item in self.id)}"
msg = msg(generated_msg) if callable(msg) else generated_msg
return self.type(msg)
# Some analysis of tolerance by logging tests from test_torch.py can be found in
# https://github.com/pytorch/pytorch/pull/32538.
# {dtype: (rtol, atol)}
_DTYPE_PRECISIONS = {
torch.float16: (0.001, 1e-5),
torch.bfloat16: (0.016, 1e-5),
torch.float32: (1.3e-6, 1e-5),
torch.float64: (1e-7, 1e-7),
torch.complex32: (0.001, 1e-5),
torch.complex64: (1.3e-6, 1e-5),
torch.complex128: (1e-7, 1e-7),
}
# The default tolerances of torch.float32 are used for quantized dtypes, because quantized tensors are compared in
# their dequantized and floating point representation. For more details see `TensorLikePair._compare_quantized_values`
_DTYPE_PRECISIONS.update(
{
dtype: _DTYPE_PRECISIONS[torch.float32]
for dtype in (
torch.quint8,
torch.quint2x4,
torch.quint4x2,
torch.qint8,
torch.qint32,
)
}
)
def default_tolerances(
*inputs: Union[torch.Tensor, torch.dtype],
dtype_precisions: Optional[Dict[torch.dtype, Tuple[float, float]]] = None,
) -> Tuple[float, float]:
"""Returns the default absolute and relative testing tolerances for a set of inputs based on the dtype.
See :func:`assert_close` for a table of the default tolerance for each dtype.
Returns:
(Tuple[float, float]): Loosest tolerances of all input dtypes.
"""
dtypes = []
for input in inputs:
if isinstance(input, torch.Tensor):
dtypes.append(input.dtype)
elif isinstance(input, torch.dtype):
dtypes.append(input)
else:
raise TypeError(
f"Expected a torch.Tensor or a torch.dtype, but got {type(input)} instead."
)
dtype_precisions = dtype_precisions or _DTYPE_PRECISIONS
rtols, atols = zip(*[dtype_precisions.get(dtype, (0.0, 0.0)) for dtype in dtypes])
return max(rtols), max(atols)
def get_tolerances(
*inputs: Union[torch.Tensor, torch.dtype],
rtol: Optional[float],
atol: Optional[float],
id: Tuple[Any, ...] = (),
) -> Tuple[float, float]:
"""Gets absolute and relative to be used for numeric comparisons.
If both ``rtol`` and ``atol`` are specified, this is a no-op. If both are not specified, the return value of
:func:`default_tolerances` is used.
Raises:
ErrorMeta: With :class:`ValueError`, if only ``rtol`` or ``atol`` is specified.
Returns:
(Tuple[float, float]): Valid absolute and relative tolerances.
"""
if (rtol is None) ^ (atol is None):
# We require both tolerance to be omitted or specified, because specifying only one might lead to surprising
# results. Imagine setting atol=0.0 and the tensors still match because rtol>0.0.
raise ErrorMeta(
ValueError,
f"Both 'rtol' and 'atol' must be either specified or omitted, "
f"but got no {'rtol' if rtol is None else 'atol'}.",
id=id,
)
elif rtol is not None and atol is not None:
return rtol, atol
else:
return default_tolerances(*inputs)
def _make_mismatch_msg(
*,
default_identifier: str,
identifier: Optional[Union[str, Callable[[str], str]]] = None,
extra: Optional[str] = None,
abs_diff: float,
abs_diff_idx: Optional[Union[int, Tuple[int, ...]]] = None,
atol: float,
rel_diff: float,
rel_diff_idx: Optional[Union[int, Tuple[int, ...]]] = None,
rtol: float,
) -> str:
"""Makes a mismatch error message for numeric values.
Args:
default_identifier (str): Default description of the compared values, e.g. "Tensor-likes".
identifier (Optional[Union[str, Callable[[str], str]]]): Optional identifier that overrides
``default_identifier``. Can be passed as callable in which case it will be called with
``default_identifier`` to create the description at runtime.
extra (Optional[str]): Extra information to be placed after the message header and the mismatch statistics.
abs_diff (float): Absolute difference.
abs_diff_idx (Optional[Union[int, Tuple[int, ...]]]): Optional index of the absolute difference.
atol (float): Allowed absolute tolerance. Will only be added to mismatch statistics if it or ``rtol`` are
``> 0``.
rel_diff (float): Relative difference.
rel_diff_idx (Optional[Union[int, Tuple[int, ...]]]): Optional index of the relative difference.
rtol (float): Allowed relative tolerance. Will only be added to mismatch statistics if it or ``atol`` are
``> 0``.
"""
equality = rtol == 0 and atol == 0
def make_diff_msg(
*,
type: str,
diff: float,
idx: Optional[Union[int, Tuple[int, ...]]],
tol: float,
) -> str:
if idx is None:
msg = f"{type.title()} difference: {diff}"
else:
msg = f"Greatest {type} difference: {diff} at index {idx}"
if not equality:
msg += f" (up to {tol} allowed)"
return msg + "\n"
if identifier is None:
identifier = default_identifier
elif callable(identifier):
identifier = identifier(default_identifier)
msg = f"{identifier} are not {'equal' if equality else 'close'}!\n\n"
if extra:
msg += f"{extra.strip()}\n"
msg += make_diff_msg(type="absolute", diff=abs_diff, idx=abs_diff_idx, tol=atol)
msg += make_diff_msg(type="relative", diff=rel_diff, idx=rel_diff_idx, tol=rtol)
return msg.strip()
def make_scalar_mismatch_msg(
actual: Union[int, float, complex],
expected: Union[int, float, complex],
*,
rtol: float,
atol: float,
identifier: Optional[Union[str, Callable[[str], str]]] = None,
) -> str:
"""Makes a mismatch error message for scalars.
Args:
actual (Union[int, float, complex]): Actual scalar.
expected (Union[int, float, complex]): Expected scalar.
rtol (float): Relative tolerance.
atol (float): Absolute tolerance.
identifier (Optional[Union[str, Callable[[str], str]]]): Optional description for the scalars. Can be passed
as callable in which case it will be called by the default value to create the description at runtime.
Defaults to "Scalars".
"""
abs_diff = abs(actual - expected)
rel_diff = float("inf") if expected == 0 else abs_diff / abs(expected)
return _make_mismatch_msg(
default_identifier="Scalars",
identifier=identifier,
abs_diff=abs_diff,
atol=atol,
rel_diff=rel_diff,
rtol=rtol,
)
def make_tensor_mismatch_msg(
actual: torch.Tensor,
expected: torch.Tensor,
mismatches: torch.Tensor,
*,
rtol: float,
atol: float,
identifier: Optional[Union[str, Callable[[str], str]]] = None,
):
"""Makes a mismatch error message for tensors.
Args:
actual (torch.Tensor): Actual tensor.
expected (torch.Tensor): Expected tensor.
mismatches (torch.Tensor): Boolean mask of the same shape as ``actual`` and ``expected`` that indicates the
location of mismatches.
rtol (float): Relative tolerance.
atol (float): Absolute tolerance.
identifier (Optional[Union[str, Callable[[str], str]]]): Optional description for the tensors. Can be passed
as callable in which case it will be called by the default value to create the description at runtime.
Defaults to "Tensor-likes".
"""
def unravel_flat_index(flat_index: int) -> Tuple[int, ...]:
if not mismatches.shape:
return ()
inverse_index = []
for size in mismatches.shape[::-1]:
div, mod = divmod(flat_index, size)
flat_index = div
inverse_index.append(mod)
return tuple(inverse_index[::-1])
number_of_elements = mismatches.numel()
total_mismatches = torch.sum(mismatches).item()
extra = (
f"Mismatched elements: {total_mismatches} / {number_of_elements} "
f"({total_mismatches / number_of_elements:.1%})"
)
a_flat = actual.flatten()
b_flat = expected.flatten()
matches_flat = ~mismatches.flatten()
abs_diff = torch.abs(a_flat - b_flat)
# Ensure that only mismatches are used for the max_abs_diff computation
abs_diff[matches_flat] = 0
max_abs_diff, max_abs_diff_flat_idx = torch.max(abs_diff, 0)
rel_diff = abs_diff / torch.abs(b_flat)
# Ensure that only mismatches are used for the max_rel_diff computation
rel_diff[matches_flat] = 0
max_rel_diff, max_rel_diff_flat_idx = torch.max(rel_diff, 0)
return _make_mismatch_msg(
default_identifier="Tensor-likes",
identifier=identifier,
extra=extra,
abs_diff=max_abs_diff.item(),
abs_diff_idx=unravel_flat_index(int(max_abs_diff_flat_idx)),
atol=atol,
rel_diff=max_rel_diff.item(),
rel_diff_idx=unravel_flat_index(int(max_rel_diff_flat_idx)),
rtol=rtol,
)
class UnsupportedInputs(Exception): # noqa: B903
"""Exception to be raised during the construction of a :class:`Pair` in case it doesn't support the inputs."""
class Pair(abc.ABC):
"""ABC for all comparison pairs to be used in conjunction with :func:`assert_equal`.
Each subclass needs to overwrite :meth:`Pair.compare` that performs the actual comparison.
Each pair receives **all** options, so select the ones applicable for the subclass and forward the rest to the
super class. Raising an :class:`UnsupportedInputs` during constructions indicates that the pair is not able to
handle the inputs and the next pair type will be tried.
All other errors should be raised as :class:`ErrorMeta`. After the instantiation, :meth:`Pair._make_error_meta` can
be used to automatically handle overwriting the message with a user supplied one and id handling.
"""
def __init__(
self,
actual: Any,
expected: Any,
*,
id: Tuple[Any, ...] = (),
**unknown_parameters: Any,
) -> None:
self.actual = actual
self.expected = expected
self.id = id
self._unknown_parameters = unknown_parameters
@staticmethod
def _inputs_not_supported() -> NoReturn:
raise UnsupportedInputs()
@staticmethod
def _check_inputs_isinstance(*inputs: Any, cls: Union[Type, Tuple[Type, ...]]):
"""Checks if all inputs are instances of a given class and raise :class:`UnsupportedInputs` otherwise."""
if not all(isinstance(input, cls) for input in inputs):
Pair._inputs_not_supported()
Loading ...