# This base template ("datapipe.pyi.in") is generated from mypy stubgen with minimal editing for code injection
# The output file will be "datapipe.pyi". This is executed as part of torch/CMakeLists.txt
# Note that, for mypy, .pyi file takes precedent over .py file, such that we must define the interface for other
# classes/objects here, even though we are not injecting extra code into them at the moment.
from torch.utils.data.datapipes._typing import _DataPipeMeta, _IterDataPipeMeta
from torch.utils.data.datapipes._hook_iterator import _SnapshotState
from typing import Any, Callable, Dict, Generic, Iterator, List, Optional, TypeVar, Union
from torch.utils.data import Dataset, IterableDataset, default_collate
T_co = TypeVar('T_co', covariant=True)
T = TypeVar('T')
UNTRACABLE_DATAFRAME_PIPES: Any
class MapDataPipe(Dataset[T_co], metaclass=_DataPipeMeta):
functions: Dict[str, Callable] = ...
reduce_ex_hook: Optional[Callable] = ...
getstate_hook: Optional[Callable] = ...
str_hook: Optional[Callable] = ...
repr_hook: Optional[Callable] = ...
def __getattr__(self, attribute_name: Any): ...
@classmethod
def register_function(cls, function_name: Any, function: Any) -> None: ...
@classmethod
def register_datapipe_as_function(cls, function_name: Any, cls_to_register: Any): ...
def __getstate__(self): ...
def __reduce_ex__(self, *args: Any, **kwargs: Any): ...
@classmethod
def set_getstate_hook(cls, hook_fn: Any) -> None: ...
@classmethod
def set_reduce_ex_hook(cls, hook_fn: Any) -> None: ...
# Functional form of 'BatcherMapDataPipe'
def batch(self, batch_size: int, drop_last: bool = False, wrapper_class=DataChunk) -> MapDataPipe: ...
# Functional form of 'ConcaterMapDataPipe'
def concat(self, *datapipes: MapDataPipe) -> MapDataPipe: ...
# Functional form of 'MapperMapDataPipe'
def map(self, fn: Callable= ...) -> MapDataPipe: ...
# Functional form of 'ShufflerIterDataPipe'
def shuffle(self, *, indices: Optional[List] = None) -> IterDataPipe: ...
# Functional form of 'ZipperMapDataPipe'
def zip(self, *datapipes: MapDataPipe[T_co]) -> MapDataPipe: ...
class IterDataPipe(IterableDataset[T_co], metaclass=_IterDataPipeMeta):
functions: Dict[str, Callable] = ...
reduce_ex_hook: Optional[Callable] = ...
getstate_hook: Optional[Callable] = ...
str_hook: Optional[Callable] = ...
repr_hook: Optional[Callable] = ...
_number_of_samples_yielded: int = ...
_snapshot_state: _SnapshotState = _SnapshotState.Iterating
_fast_forward_iterator: Optional[Iterator] = ...
def __getattr__(self, attribute_name: Any): ...
@classmethod
def register_function(cls, function_name: Any, function: Any) -> None: ...
@classmethod
def register_datapipe_as_function(cls, function_name: Any, cls_to_register: Any, enable_df_api_tracing: bool = ...): ...
def __getstate__(self): ...
def __reduce_ex__(self, *args: Any, **kwargs: Any): ...
@classmethod
def set_getstate_hook(cls, hook_fn: Any) -> None: ...
@classmethod
def set_reduce_ex_hook(cls, hook_fn: Any) -> None: ...
# Functional form of 'BatcherIterDataPipe'
def batch(self, batch_size: int, drop_last: bool = False, wrapper_class=DataChunk) -> IterDataPipe: ...
# Functional form of 'CollatorIterDataPipe'
def collate(self, conversion: Optional[Union[Callable[..., Any],Dict[Union[str, Any], Union[Callable, Any]],]] = default_collate, collate_fn: Optional[Callable] = None) -> IterDataPipe: ...
# Functional form of 'ConcaterIterDataPipe'
def concat(self, *datapipes: IterDataPipe) -> IterDataPipe: ...
# Functional form of 'DemultiplexerIterDataPipe'
def demux(self, num_instances: int, classifier_fn: Callable[[T_co], Optional[int]], drop_none: bool = False, buffer_size: int = 1000) -> List[IterDataPipe]: ...
# Functional form of 'FilterIterDataPipe'
def filter(self, filter_fn: Callable, input_col=None) -> IterDataPipe: ...
# Functional form of 'ForkerIterDataPipe'
def fork(self, num_instances: int, buffer_size: int = 1000) -> List[IterDataPipe]: ...
# Functional form of 'GrouperIterDataPipe'
def groupby(self, group_key_fn: Callable[[T_co], Any], *, keep_key: bool = False, buffer_size: int = 10000, group_size: Optional[int] = None, guaranteed_group_size: Optional[int] = None, drop_remaining: bool = False) -> IterDataPipe: ...
# Functional form of 'FileListerIterDataPipe'
def list_files(self, masks: Union[str, List[str]] = '', *, recursive: bool = False, abspath: bool = False, non_deterministic: bool = False, length: int = -1) -> IterDataPipe: ...
# Functional form of 'MapperIterDataPipe'
def map(self, fn: Callable, input_col=None, output_col=None) -> IterDataPipe: ...
# Functional form of 'MultiplexerIterDataPipe'
def mux(self, *datapipes) -> IterDataPipe: ...
# Functional form of 'FileOpenerIterDataPipe'
def open_files(self, mode: str = 'r', encoding: Optional[str] = None, length: int = -1) -> IterDataPipe: ...
# Functional form of 'StreamReaderIterDataPipe'
def read_from_stream(self, chunk=None) -> IterDataPipe: ...
# Functional form of 'RoutedDecoderIterDataPipe'
def routed_decode(self, *handlers: Callable, key_fn: Callable= ...) -> IterDataPipe: ...
# Functional form of 'ShardingFilterIterDataPipe'
def sharding_filter(self, sharding_group_filter=None) -> IterDataPipe: ...
# Functional form of 'ShufflerIterDataPipe'
def shuffle(self, *, buffer_size: int = 10000, unbatch_level: int = 0) -> IterDataPipe: ...
# Functional form of 'UnBatcherIterDataPipe'
def unbatch(self, unbatch_level: int = 1) -> IterDataPipe: ...
# Functional form of 'ZipperIterDataPipe'
def zip(self, *datapipes: IterDataPipe) -> IterDataPipe: ...
class DFIterDataPipe(IterDataPipe):
def _is_dfpipe(self): ...
class _DataPipeSerializationWrapper:
def __init__(self, datapipe): ...
def __getstate__(self): ...
def __setstate__(self, state): ...
def __len__(self): ...
class _IterDataPipeSerializationWrapper(_DataPipeSerializationWrapper, IterDataPipe):
def __iter__(self): ...
class _MapDataPipeSerializationWrapper(_DataPipeSerializationWrapper, MapDataPipe):
def __getitem__(self, idx): ...
class DataChunk(list, Generic[T]):
def __init__(self, items):
super().__init__(items)
self.items = items
def as_str(self, indent=''):
res = indent + "[" + ", ".join(str(i) for i in iter(self)) + "]"
return res
def __iter__(self) -> Iterator[T]:
for i in super().__iter__():
yield i
def raw_iterator(self) -> T: # type: ignore[misc]
for i in self.items:
yield i