Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / torch   python

Repository URL to install this package:

Version: 2.0.1+cpu 

/ utils / data / datapipes / datapipe.pyi

# This base template ("datapipe.pyi.in") is generated from mypy stubgen with minimal editing for code injection
# The output file will be "datapipe.pyi". This is executed as part of torch/CMakeLists.txt
# Note that, for mypy, .pyi file takes precedent over .py file, such that we must define the interface for other
# classes/objects here, even though we are not injecting extra code into them at the moment.

from torch.utils.data.datapipes._typing import _DataPipeMeta, _IterDataPipeMeta
from torch.utils.data.datapipes._hook_iterator import _SnapshotState
from typing import Any, Callable, Dict, Generic, Iterator, List, Optional, TypeVar, Union
from torch.utils.data import Dataset, IterableDataset, default_collate

T_co = TypeVar('T_co', covariant=True)
T = TypeVar('T')
UNTRACABLE_DATAFRAME_PIPES: Any


class MapDataPipe(Dataset[T_co], metaclass=_DataPipeMeta):
    functions: Dict[str, Callable] = ...
    reduce_ex_hook: Optional[Callable] = ...
    getstate_hook: Optional[Callable] = ...
    str_hook: Optional[Callable] = ...
    repr_hook: Optional[Callable] = ...
    def __getattr__(self, attribute_name: Any): ...
    @classmethod
    def register_function(cls, function_name: Any, function: Any) -> None: ...
    @classmethod
    def register_datapipe_as_function(cls, function_name: Any, cls_to_register: Any): ...
    def __getstate__(self): ...
    def __reduce_ex__(self, *args: Any, **kwargs: Any): ...
    @classmethod
    def set_getstate_hook(cls, hook_fn: Any) -> None: ...
    @classmethod
    def set_reduce_ex_hook(cls, hook_fn: Any) -> None: ...
    # Functional form of 'BatcherMapDataPipe'
    def batch(self, batch_size: int, drop_last: bool = False, wrapper_class=DataChunk) -> MapDataPipe: ...
    # Functional form of 'ConcaterMapDataPipe'
    def concat(self, *datapipes: MapDataPipe) -> MapDataPipe: ...
    # Functional form of 'MapperMapDataPipe'
    def map(self, fn: Callable= ...) -> MapDataPipe: ...
    # Functional form of 'ShufflerIterDataPipe'
    def shuffle(self, *, indices: Optional[List] = None) -> IterDataPipe: ...
    # Functional form of 'ZipperMapDataPipe'
    def zip(self, *datapipes: MapDataPipe[T_co]) -> MapDataPipe: ...


class IterDataPipe(IterableDataset[T_co], metaclass=_IterDataPipeMeta):
    functions: Dict[str, Callable] = ...
    reduce_ex_hook: Optional[Callable] = ...
    getstate_hook: Optional[Callable] = ...
    str_hook: Optional[Callable] = ...
    repr_hook: Optional[Callable] = ...
    _number_of_samples_yielded: int = ...
    _snapshot_state: _SnapshotState = _SnapshotState.Iterating
    _fast_forward_iterator: Optional[Iterator] = ...
    def __getattr__(self, attribute_name: Any): ...
    @classmethod
    def register_function(cls, function_name: Any, function: Any) -> None: ...
    @classmethod
    def register_datapipe_as_function(cls, function_name: Any, cls_to_register: Any, enable_df_api_tracing: bool = ...): ...
    def __getstate__(self): ...
    def __reduce_ex__(self, *args: Any, **kwargs: Any): ...
    @classmethod
    def set_getstate_hook(cls, hook_fn: Any) -> None: ...
    @classmethod
    def set_reduce_ex_hook(cls, hook_fn: Any) -> None: ...
    # Functional form of 'BatcherIterDataPipe'
    def batch(self, batch_size: int, drop_last: bool = False, wrapper_class=DataChunk) -> IterDataPipe: ...
    # Functional form of 'CollatorIterDataPipe'
    def collate(self, conversion: Optional[Union[Callable[..., Any],Dict[Union[str, Any], Union[Callable, Any]],]] = default_collate, collate_fn: Optional[Callable] = None) -> IterDataPipe: ...
    # Functional form of 'ConcaterIterDataPipe'
    def concat(self, *datapipes: IterDataPipe) -> IterDataPipe: ...
    # Functional form of 'DemultiplexerIterDataPipe'
    def demux(self, num_instances: int, classifier_fn: Callable[[T_co], Optional[int]], drop_none: bool = False, buffer_size: int = 1000) -> List[IterDataPipe]: ...
    # Functional form of 'FilterIterDataPipe'
    def filter(self, filter_fn: Callable, input_col=None) -> IterDataPipe: ...
    # Functional form of 'ForkerIterDataPipe'
    def fork(self, num_instances: int, buffer_size: int = 1000) -> List[IterDataPipe]: ...
    # Functional form of 'GrouperIterDataPipe'
    def groupby(self, group_key_fn: Callable[[T_co], Any], *, keep_key: bool = False, buffer_size: int = 10000, group_size: Optional[int] = None, guaranteed_group_size: Optional[int] = None, drop_remaining: bool = False) -> IterDataPipe: ...
    # Functional form of 'FileListerIterDataPipe'
    def list_files(self, masks: Union[str, List[str]] = '', *, recursive: bool = False, abspath: bool = False, non_deterministic: bool = False, length: int = -1) -> IterDataPipe: ...
    # Functional form of 'MapperIterDataPipe'
    def map(self, fn: Callable, input_col=None, output_col=None) -> IterDataPipe: ...
    # Functional form of 'MultiplexerIterDataPipe'
    def mux(self, *datapipes) -> IterDataPipe: ...
    # Functional form of 'FileOpenerIterDataPipe'
    def open_files(self, mode: str = 'r', encoding: Optional[str] = None, length: int = -1) -> IterDataPipe: ...
    # Functional form of 'StreamReaderIterDataPipe'
    def read_from_stream(self, chunk=None) -> IterDataPipe: ...
    # Functional form of 'RoutedDecoderIterDataPipe'
    def routed_decode(self, *handlers: Callable, key_fn: Callable= ...) -> IterDataPipe: ...
    # Functional form of 'ShardingFilterIterDataPipe'
    def sharding_filter(self, sharding_group_filter=None) -> IterDataPipe: ...
    # Functional form of 'ShufflerIterDataPipe'
    def shuffle(self, *, buffer_size: int = 10000, unbatch_level: int = 0) -> IterDataPipe: ...
    # Functional form of 'UnBatcherIterDataPipe'
    def unbatch(self, unbatch_level: int = 1) -> IterDataPipe: ...
    # Functional form of 'ZipperIterDataPipe'
    def zip(self, *datapipes: IterDataPipe) -> IterDataPipe: ...


class DFIterDataPipe(IterDataPipe):
    def _is_dfpipe(self): ...


class _DataPipeSerializationWrapper:
    def __init__(self, datapipe): ...
    def __getstate__(self): ...
    def __setstate__(self, state): ...
    def __len__(self): ...


class _IterDataPipeSerializationWrapper(_DataPipeSerializationWrapper, IterDataPipe):
    def __iter__(self): ...


class _MapDataPipeSerializationWrapper(_DataPipeSerializationWrapper, MapDataPipe):
    def __getitem__(self, idx): ...


class DataChunk(list, Generic[T]):
    def __init__(self, items):
        super().__init__(items)
        self.items = items

    def as_str(self, indent=''):
        res = indent + "[" + ", ".join(str(i) for i in iter(self)) + "]"
        return res

    def __iter__(self) -> Iterator[T]:
        for i in super().__iter__():
            yield i

    def raw_iterator(self) -> T:  # type: ignore[misc]
        for i in self.items:
            yield i