Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / torch   python

Repository URL to install this package:

Version: 2.0.1+cpu 

/ distributed / _shard / common_op_utils.py

import torch
from torch.utils._pytree import tree_map
from typing import Optional

def _basic_validation(op, args=(), kwargs=None):
    """
    Common validation across all ops go in here.
    """
    from torch.distributed._shard.partial_tensor import _PartialTensor
    from torch.distributed._shard.replicated_tensor import ReplicatedTensor
    from torch.distributed._shard.sharded_tensor import ShardedTensor

    if len(args) == 0 and (kwargs is None or len(kwargs) == 0):
        raise ValueError(f" No input for '{op.__name__}'!")

    # Validate types
    has_distributed_tensor = False

    def is_distributed_tensor(e):
        nonlocal has_distributed_tensor
        if isinstance(e, (ReplicatedTensor, _PartialTensor, ShardedTensor)):
            has_distributed_tensor = True

    tree_map(is_distributed_tensor, args)
    tree_map(is_distributed_tensor, kwargs)

    if not has_distributed_tensor:
        raise TypeError(
            f"torch function '{op.__name__}', with args: {args} and "
            f"kwargs: {kwargs} are called without any distributed tensor!"
        )

    # Validate all distributed tensors use the same PG.
    cur_pg: Optional[torch.distributed.ProcessGroup] = None

    def validate_pg(e):
        nonlocal cur_pg
        if isinstance(e, (ReplicatedTensor, _PartialTensor, ShardedTensor)):
            if cur_pg is not None and e._process_group is not cur_pg:
                raise RuntimeError(
                    'All distributed tensors should use the '
                    'same ProcessGroup if used together in an op.'
                )
            cur_pg = e._process_group

    tree_map(validate_pg, args)
    tree_map(validate_pg, kwargs)

def _register_default_op(op, decorator):
    @decorator(op)
    def tensor_default_op(types, args=(), kwargs=None, pg=None):
        """
        Handles ``__torch_function__`` dispatch for the default tensor ops that
        behave the same as ``torch.Tensor`` such as ``torch.Tensor.shape`` or
        ``torch.Tensor.dtype``. We simply lower to the real op call with
        DisableTorchFunctionSubclass context like ``torch.Tensor.__torch_function__``
        to avoid recursions.
        """
        if kwargs is None:
            kwargs = {}

        with torch._C.DisableTorchFunctionSubclass():
            return op(*args, **kwargs)