import contextlib
import functools
import itertools
import logging
import os
import weakref
from dataclasses import dataclass
from functools import partial
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, TypeVar, Union
from weakref import ReferenceType
import torch
from torch._guards import Source
from torch._ops import OpOverload
from torch._prims_common import (
elementwise_dtypes,
ELEMENTWISE_TYPE_PROMOTION_KIND,
is_float_dtype,
is_integer_dtype,
)
from torch._subclasses.meta_utils import MetaConverter
from torch.fx.operator_schemas import normalize_function
from torch.multiprocessing.reductions import StorageWeakRef
from torch.overrides import TorchFunctionMode
from torch.utils._mode_utils import no_dispatch
from torch.utils._python_dispatch import TorchDispatchMode
from torch.utils._pytree import PyTree, tree_flatten, tree_map, tree_map_only
from torch.utils._stats import count, count_label
from torch.utils.weak import WeakIdRef
log = logging.getLogger(__name__)
pytree = torch.utils._pytree
T = TypeVar("T")
TensorWeakRef = Any
aten = torch._ops.ops.aten
CONSTANT_NUMEL_LIMIT = 1
RECURSION_COUNT = 0
# Small helper that increments recursion count, and
# resets it when the object goes out of scope. Useful
# if you don't want to increase indentation which is
# what a context manager would do.
class IncrementRecursionCount:
def __init__(self):
global RECURSION_COUNT
RECURSION_COUNT += 1
def __del__(self):
global RECURSION_COUNT
RECURSION_COUNT -= 1
@dataclass
class UnsupportedFakeTensorException(RuntimeError):
reason: str
@dataclass
class DynamicOutputShapeException(RuntimeError):
func: OpOverload
@dataclass
class DataDependentOutputException(RuntimeError):
func: OpOverload
_device_not_kwarg_ops = (
aten._resize_output_.default,
aten._nested_tensor_from_tensor_list.default,
aten._nested_tensor_from_tensor_list.out,
aten.pin_memory.default,
aten.is_pinned.default,
aten.to.device,
aten.to.prim_Device,
aten._pin_memory.default,
aten._pin_memory.out,
aten._resize_output.default,
aten._resize_output.out,
)
# this op is never actually used
_non_kwarg_device_constructors = (aten._list_to_tensor,)
def contains_tensor_types(type):
tensor_type = torch._C.TensorType.get()
return type.isSubtypeOf(tensor_type) or any(
contains_tensor_types(e) for e in type.containedTypes()
)
_like_tensor_constructors = (
aten.empty_like.default,
aten.empty_like.out,
aten.full_like.default,
aten.full_like.out,
aten.ones_like.default,
aten.ones_like.out,
aten.rand_like.default,
aten.rand_like.out,
aten.randn_like.default,
aten.randn_like.out,
aten.randint_like.default,
aten.randint_like.out,
aten.randint_like.low_dtype,
aten.randint_like.low_dtype_out,
aten.zeros_like.default,
aten.zeros_like.out,
aten.new_empty.default,
aten.new_empty.out,
aten.new_empty_strided.default,
aten.new_empty_strided.out,
aten.new_full.default,
aten.new_full.out,
aten.new_zeros.default,
aten.new_zeros.out,
aten.new_ones.default,
aten.new_ones.out,
)
@functools.lru_cache(None)
def _is_tensor_constructor(func: OpOverload):
assert isinstance(func, OpOverload)
schema = func._schema
if any(contains_tensor_types(arg.type) for arg in schema.arguments):
return False
# TODO: no real reason to restrict multiple outputs
return (
len(schema.returns) == 1 and schema.returns[0].type is torch._C.TensorType.get()
)
@functools.lru_cache(None)
def get_schema_info(func):
return torch._C._SchemaInfo(func._schema) # type: ignore[attr-defined]
# many of the decompositions registered to torch/_prims do not at the moment model
# aliasing or strides, so as an incremental step, just enable the decompositions in
# torch/_decomp/decompositions.py.
# decomps are used for aot autograd tracing so we would like to unify on their
# implementation and add additional testing to them
@functools.lru_cache(None)
def torch_decomp_decompositions(func):
from torch._decomp import decomposition_table
decompositions = torch._decomp.decompositions
decomp_attrs = [getattr(decompositions, attr) for attr in dir(decompositions)]
return decomposition_table[func] in decomp_attrs
def tree_flatten_only(ty: Type[T], pytree: PyTree):
flat_vals, _ = tree_flatten(pytree)
return [elem for elem in flat_vals if isinstance(elem, ty)]
# Similar to `MetaConverter`, this is a class for converting
# multiple tensors into fake tensors which share the same view/storage
# structure. Like `MetaConverter`, it uses `WeakIdRef` to
# hold a weak reference for all memoized tensors.
class FakeTensorConverter:
@property
def tensor_memo(self):
return self.meta_converter.tensor_memo
meta_converter: MetaConverter
constant_storage_mapping: Dict[StorageWeakRef, List[ReferenceType]]
def __init__(self):
self.meta_converter = MetaConverter()
# map from to storage to corresponding constant tensors
self.constant_storage_mapping = {}
def add_constant_storage_mapping(self, fake_tensor):
# when you have a constant, aliased tensor:
# const_tensor.add_(torch.rand([1]))
# all aliases of it must become no longer const
assert isinstance(fake_tensor, FakeTensor) and fake_tensor.constant is not None
weak_st = StorageWeakRef(fake_tensor.constant._typed_storage())
# we need a map from a weak storage to all of its corresponding
# constant tensors. python doesn't have the weak value equivalent
# of defaultdict(list), so we are using a WeakValueDictionary as one
if weak_st not in self.constant_storage_mapping:
self.constant_storage_mapping[weak_st] = []
self.constant_storage_mapping[weak_st].append(weakref.ref(fake_tensor))
def invalidate_constant_aliases(self, tensor):
assert not isinstance(tensor, FakeTensor)
weak_st = StorageWeakRef(tensor._typed_storage())
if weak_st not in self.constant_storage_mapping:
return
for weak_tensor_ref in self.constant_storage_mapping[weak_st]:
ten = weak_tensor_ref()
if ten is not None:
ten._fix_weakref()
ten.constant = None
del self.constant_storage_mapping[weak_st]
def _get_memo(self, t):
if WeakIdRef(t) in self.tensor_memo:
out = self.tensor_memo[WeakIdRef(t)]
out._fix_weakref()
return out
return None
def set_tensor_memo(self, t, v):
th = WeakIdRef(t)
# hold a weak ref to self, otherwise it will be kept alive
# by the del_ten closure
self_weak_ref = weakref.ref(self)
def del_ten():
self_ref = self_weak_ref()
if self_ref is None:
return
# on shutdown, th may not be in memo
self_ref.tensor_memo.pop(th, None)
weakref.finalize(t, del_ten)
self.tensor_memo[th] = v
def from_real_tensor(
self,
fake_mode,
t,
make_constant=False,
shape_env=None,
ignore_subclass=False,
*,
source=None,
):
maybe_memo = self._get_memo(t)
if maybe_memo is not None:
return maybe_memo
existing_device = t.device
# not yet supported in metatensors
if t.is_quantized:
raise UnsupportedFakeTensorException("quantized nyi in meta tensors")
if type(t) is torch.nn.Parameter:
assert not make_constant
def mk_fake_tensor(make_meta_t):
# NB: don't use in_kernel_invocation_manager. to
# ensure FakeTensor can internally do constant computation
# as necessary. Invocation manager is "more correct" as
# it works for more operators in make_meta_t, but
# invariant is that make_meta_t only calls factories
# for which it is not strictly necessary to use the
# invocation manager (I think!)
with no_dispatch():
return FakeTensor(
fake_mode,
make_meta_t(),
existing_device,
constant=t if make_constant else None,
)
out = self.meta_converter(
t,
shape_env=shape_env,
callback=mk_fake_tensor,
ignore_subclass=ignore_subclass,
source=source,
)
if out is NotImplemented:
raise UnsupportedFakeTensorException("meta converter nyi")
if make_constant:
self.add_constant_storage_mapping(out)
# NB: meta_converter set the memo
return out
# If you specify the device, it MUST be a meta tensor.
def from_meta_and_device(self, fake_mode, t, device):
assert (
t.device.type == "meta"
), f"tensor's device must be `meta`, got {t.device.type} instead"
maybe_memo = self._get_memo(t)
if maybe_memo is not None:
return maybe_memo
out = FakeTensor(fake_mode, t, device)
self.set_tensor_memo(t, out)
return out
# You can have a real tensor that you need to convert into a fake tensor.
# If you have a meta tensor already, call from_meta_and_device.
#
# You're allowed to pass a meta tensor to be turned into a fake
# tensor; although an odd thing to do, this can occur if you're doing
# cross ref testing and the inner test is already operating on meta tensors.
def __call__(
self,
fake_mode,
t,
*,
make_constant=False,
shape_env=None,
ignore_subclass=False,
source=None,
):
return self.from_real_tensor(
fake_mode,
t,
make_constant,
shape_env=shape_env,
ignore_subclass=ignore_subclass,
source=source,
)
op_implementations = []
def register_op_impl(run_impl_check: Union[Callable[[OpOverload], bool], OpOverload]):
def impl_decorator(op_impl):
global op_implementations
if isinstance(run_impl_check, OpOverload):
op_implementations.append((lambda func: func == run_impl_check, op_impl))
else:
op_implementations.append((run_impl_check, op_impl))
return op_impl
return impl_decorator
@register_op_impl(
lambda func: (_is_tensor_constructor(func) or func in _like_tensor_constructors)
)
def constructors(fake_mode, func, *args, **kwargs):
assert func not in _non_kwarg_device_constructors
_, new_kwargs = normalize_function(
Loading ...