import collections
import contextlib
import functools
import importlib
import inspect
import random
import types
from typing import Dict, List
import torch.nn
from .. import variables
from ..exc import unimplemented
from ..guards import GuardBuilder
from ..source import AttrSource, ODictGetItemSource, RandomValueSource
from ..utils import is_namedtuple_cls, namedtuple_fields
from .base import MutableLocal, VariableTracker
from .misc import NullContextVariable
class UserDefinedVariable(VariableTracker):
pass
class UserDefinedClassVariable(UserDefinedVariable):
def __init__(self, value, **kwargs):
super().__init__(**kwargs)
self.value = value
def as_python_constant(self):
return self.value
def python_type(self):
return type(self.value)
def var_getattr(self, tx, name: str) -> "VariableTracker":
from . import ConstantVariable
from .builder import VariableBuilder
options = VariableTracker.propagate(self)
source = AttrSource(self.source, name) if self.source is not None else None
try:
obj = inspect.getattr_static(self.value, name)
except AttributeError:
obj = None
if isinstance(obj, staticmethod):
return variables.UserFunctionVariable(
obj.__get__(self.value), source=source, **options
)
elif isinstance(obj, classmethod):
return variables.UserMethodVariable(
obj.__func__, self, source=source, **options
)
if name in getattr(self.value, "__dict__", {}) or ConstantVariable.is_literal(
obj
):
if source:
return VariableBuilder(tx, source)(obj).add_options(options)
elif ConstantVariable.is_literal(obj):
return ConstantVariable(obj, **options)
return super().var_getattr(tx, name)
def call_method(
self,
tx,
name,
args: "List[VariableTracker]",
kwargs: "Dict[str, VariableTracker]",
) -> "VariableTracker":
if (
name == "__subclasses__"
and len(args) == 0
and not kwargs
and "__subclasses__" not in self.value.__dict__
):
options = VariableTracker.propagate(self, args, kwargs.values())
options["mutable_local"] = MutableLocal()
subs_as_vars: List[VariableTracker] = list()
for sub in self.value.__subclasses__():
source = AttrSource(tx.import_source(sub.__module__), sub.__name__)
subs_as_vars.append(
variables.UserDefinedClassVariable(sub, source=source)
)
return variables.ListVariable(subs_as_vars, **options)
return super().call_method(tx, name, args, kwargs)
def call_function(
self, tx, args: "List[VariableTracker]", kwargs: "Dict[str, VariableTracker]"
) -> "VariableTracker":
from ..side_effects import SideEffects
options = VariableTracker.propagate(self, args, kwargs.values())
if self.value in (
contextlib.nullcontext,
torch.autograd.profiler.profile,
):
return NullContextVariable(**options)
elif is_namedtuple_cls(self.value):
fields = namedtuple_fields(self.value)
items = list(args)
items.extend([None] * (len(fields) - len(items)))
for name, value in kwargs.items():
assert name in fields
items[fields.index(name)] = value
assert all(x is not None for x in items)
return variables.NamedTupleVariable(
items, self.value, **VariableTracker.propagate(self, items)
)
elif (
inspect.getattr_static(self.value, "__new__", None) in (object.__new__,)
and SideEffects.cls_supports_mutation_side_effects(self.value)
and self.source
):
var = tx.output.side_effects.track_object_new(
self.source, self.value, UserDefinedObjectVariable, options
)
return var.add_options(var.call_method(tx, "__init__", args, kwargs))
elif variables.DataClassVariable.is_matching_cls(self.value):
options["mutable_local"] = MutableLocal()
return variables.DataClassVariable.create(self.value, args, kwargs, options)
return super().call_function(tx, args, kwargs)
def const_getattr(self, tx, name):
if name == "__name__":
return self.value.__name__
return super().const_getattr(tx, name)
class UserDefinedObjectVariable(UserDefinedVariable):
"""
Mostly objects of defined type. Catch-all for something where we only know the type.
"""
def __init__(self, value, value_type=None, **kwargs):
super().__init__(**kwargs)
self.value = value
self.value_type = value_type or type(value)
assert type(value) is self.value_type
def __str__(self):
inner = self.value_type.__name__
if inner in [
"builtin_function_or_method",
"getset_descriptor",
"method_descriptor",
"method",
]:
inner = str(getattr(self.value, "__name__", None))
return f"{self.__class__.__name__}({inner})"
def python_type(self):
return self.value_type
@staticmethod
@functools.lru_cache(None)
def _supported_random_functions():
fns = {
random.random,
random.randint,
random.randrange,
random.uniform,
}
return fns
def call_method(
self,
tx,
name,
args: "List[VariableTracker]",
kwargs: "Dict[str, VariableTracker]",
) -> "VariableTracker":
from . import ConstantVariable, TupleVariable, UserMethodVariable
options = VariableTracker.propagate(self, args, kwargs.values())
if name not in getattr(self.value, "__dict__", {}):
try:
method = inspect.getattr_static(type(self.value), name)
except AttributeError:
method = None
if method is object.__init__:
return ConstantVariable(None, **options)
if method is collections.OrderedDict.keys and self.source:
# subclass of OrderedDict
assert not (args or kwargs)
keys = list(self.value.keys())
assert all(map(ConstantVariable.is_literal, keys))
return TupleVariable(
[ConstantVariable(k, **options) for k in keys], **options
).add_guard(self.source.make_guard(GuardBuilder.ODICT_KEYS))
if (
method is collections.OrderedDict.items
and isinstance(self.value, collections.OrderedDict)
and self.source
):
assert not (args or kwargs)
items = []
keys = self.call_method(tx, "keys", [], {})
options = VariableTracker.propagate(self, args, kwargs.values(), keys)
for key in keys.unpack_var_sequence(tx):
items.append(
TupleVariable(
[key, self.odict_getitem(tx, key)],
**options,
)
)
return TupleVariable(items, **options)
if method is collections.OrderedDict.__getitem__ and len(args) == 1:
assert not kwargs
return self.odict_getitem(tx, args[0])
# check for methods implemented in C++
if isinstance(method, types.FunctionType):
source = (
None
if self.source is None
else AttrSource(AttrSource(self.source, "__class__"), name)
)
# TODO(jansel): add a guard to check for monkey patching?
return UserMethodVariable(
method, self, source=source, **options
).call_function(tx, args, kwargs)
return super().call_method(tx, name, args, kwargs)
def is_supported_random(self):
try:
return self.value in self._supported_random_functions()
except TypeError:
# TypeError: unhashable type
return False
def call_function(
self, tx, args: "List[VariableTracker]", kwargs: "Dict[str, VariableTracker]"
) -> "VariableTracker":
from .builder import VariableBuilder
if (
self.is_supported_random()
and all(k.is_python_constant() for k in args)
and all(v.is_python_constant() for v in kwargs.values())
):
args = [x.as_python_constant() for x in args]
kwargs = {k: v.as_python_constant() for k, v in kwargs.items()}
random_call_index = len(tx.random_calls)
if random_call_index == 0:
tx.output.initial_random_state = random.getstate()
example_value = self.value(*args, **kwargs)
source = RandomValueSource(random_call_index)
tx.random_calls.append((self.value, args, kwargs))
return VariableBuilder(tx, source).wrap_unspecialized_primitive(
example_value
)
return super().call_function(tx, args, kwargs)
def _check_for_getattribute(self):
try:
if isinstance(
inspect.getattr_static(type(self.value), "__getattribute__"),
types.FunctionType,
):
unimplemented("UserDefinedObjectVariable with custom __getattribute__")
except AttributeError:
pass
def _check_for_getattr(self):
try:
getattr_fn = inspect.getattr_static(type(self.value), "__getattr__")
except AttributeError:
getattr_fn = None
if getattr_fn is torch.nn.Module.__getattr__:
# ignore this case of getattr
getattr_fn = None
return getattr_fn
def _getattr_static(self, name):
if (
isinstance(self.value, torch.nn.Module)
or "__slots__" in self.value.__class__.__dict__
):
# getattr_static doesn't work on these
subobj = getattr(self.value, name)
else:
subobj = inspect.getattr_static(self.value, name)
return subobj
def var_getattr(self, tx, name):
from . import ConstantVariable
from .builder import VariableBuilder
options = VariableTracker.propagate(self)
value = self.value
source = AttrSource(self.source, name) if self.source else None
self._check_for_getattribute()
getattr_fn = self._check_for_getattr()
try:
subobj = self._getattr_static(name)
except AttributeError:
subobj = None
if isinstance(getattr_fn, types.FunctionType):
return variables.UserMethodVariable(
getattr_fn, self, source=source, **options
).call_function(tx, [ConstantVariable(name)], {})
elif getattr_fn is not None:
unimplemented("UserDefined with non-function __getattr__")
if isinstance(subobj, property):
return variables.UserMethodVariable(
subobj.fget, self, source=source, **options
).call_function(tx, [], {})
elif isinstance(subobj, staticmethod):
return variables.UserFunctionVariable(
subobj.__get__(self.value), source=source, **options
)
elif isinstance(subobj, classmethod):
return variables.UserMethodVariable(
subobj.__func__, self, source=source, **options
)
elif isinstance(subobj, types.FunctionType):
return variables.UserMethodVariable(subobj, self, source=source, **options)
if (
name in getattr(value, "__dict__", {})
or ConstantVariable.is_literal(subobj)
or isinstance(
subobj,
(
torch.Tensor,
torch.nn.Module,
),
)
):
if source:
return VariableBuilder(tx, source)(subobj).add_options(options)
Loading ...