Repository URL to install this package:
|
Version:
2.0.0rc1 ▾
|
import math
import uuid
from typing import Any, Dict, Iterator, List, Optional, Tuple
import numpy as np
import ray
from ray.data._internal.block_list import BlockList
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.stats import DatasetStats, _get_or_create_stats_actor
from ray.data.block import (
Block,
BlockAccessor,
BlockExecStats,
BlockMetadata,
BlockPartitionMetadata,
MaybeBlockPartition,
)
from ray.data.context import DatasetContext
from ray.data.datasource import ReadTask
from ray.types import ObjectRef
class LazyBlockList(BlockList):
"""A BlockList that submits tasks lazily on-demand.
This BlockList is used for implementing read operations (e.g., to avoid
needing to read all files of a Dataset when the user is just wanting to
.take() the first few rows or view the schema).
"""
def __init__(
self,
tasks: List[ReadTask],
block_partition_refs: Optional[List[ObjectRef[MaybeBlockPartition]]] = None,
block_partition_meta_refs: Optional[
List[ObjectRef[BlockPartitionMetadata]]
] = None,
cached_metadata: Optional[List[BlockPartitionMetadata]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
stats_uuid: str = None,
*,
owned_by_consumer: bool,
):
"""Create a LazyBlockList on the provided read tasks.
Args:
tasks: The read tasks that will produce the blocks of this lazy block list.
block_partition_refs: An optional list of already submitted read task
futures (i.e. block partition refs). This should be the same length as
the tasks argument.
block_partition_meta_refs: An optional list of block partition metadata
refs. This should be the same length as the tasks argument.
cached_metadata: An optional list of already computed AND fetched metadata.
This serves as a cache of fetched block metadata.
ray_remote_args: Ray remote arguments for the read tasks.
stats_uuid: UUID for the dataset stats, used to group and fetch read task
stats. If not provided, a new UUID will be created.
"""
self._tasks = tasks
self._num_blocks = len(self._tasks)
if stats_uuid is None:
stats_uuid = uuid.uuid4()
self._stats_uuid = stats_uuid
self._execution_started = False
self._remote_args = ray_remote_args or {}
# Block partition metadata that have already been computed and fetched.
if cached_metadata is not None:
self._cached_metadata = cached_metadata
else:
self._cached_metadata = [None] * len(tasks)
# Block partition metadata that have already been computed.
if block_partition_meta_refs is not None:
self._block_partition_meta_refs = block_partition_meta_refs
else:
self._block_partition_meta_refs = [None] * len(tasks)
# Block partitions that have already been computed.
if block_partition_refs is not None:
self._block_partition_refs = block_partition_refs
else:
self._block_partition_refs = [None] * len(tasks)
assert len(tasks) == len(self._block_partition_refs), (
tasks,
self._block_partition_refs,
)
assert len(tasks) == len(self._block_partition_meta_refs), (
tasks,
self._block_partition_meta_refs,
)
assert len(tasks) == len(self._cached_metadata), (
tasks,
self._cached_metadata,
)
# Whether the block list is owned by consuming APIs, and if so it can be
# eagerly deleted after read by the consumer.
self._owned_by_consumer = owned_by_consumer
def get_metadata(self, fetch_if_missing: bool = False) -> List[BlockMetadata]:
"""Get the metadata for all blocks."""
if all(meta is not None for meta in self._cached_metadata):
# Always return fetched metadata if we already have it.
metadata = self._cached_metadata
elif not fetch_if_missing:
metadata = [
m if m is not None else t.get_metadata()
for m, t in zip(self._cached_metadata, self._tasks)
]
else:
_, metadata = self._get_blocks_with_metadata()
return metadata
def stats(self) -> DatasetStats:
"""Create DatasetStats for this LazyBlockList."""
return DatasetStats(
stages={"read": self.get_metadata(fetch_if_missing=False)},
parent=None,
needs_stats_actor=True,
stats_uuid=self._stats_uuid,
)
def copy(self) -> "LazyBlockList":
return LazyBlockList(
self._tasks.copy(),
block_partition_refs=self._block_partition_refs.copy(),
block_partition_meta_refs=self._block_partition_meta_refs.copy(),
cached_metadata=self._cached_metadata,
ray_remote_args=self._remote_args.copy(),
owned_by_consumer=self._owned_by_consumer,
stats_uuid=self._stats_uuid,
)
def clear(self):
"""Clears all object references (block partitions and base block partitions)
from this lazy block list.
"""
self._block_partition_refs = [None for _ in self._block_partition_refs]
self._block_partition_meta_refs = [
None for _ in self._block_partition_meta_refs
]
self._cached_metadata = [None for _ in self._cached_metadata]
def is_cleared(self) -> bool:
return all(ref is None for ref in self._block_partition_refs)
def _check_if_cleared(self):
pass # LazyBlockList can always be re-computed.
# Note: does not force execution prior to splitting.
def split(self, split_size: int) -> List["LazyBlockList"]:
num_splits = math.ceil(len(self._tasks) / split_size)
tasks = np.array_split(self._tasks, num_splits)
block_partition_refs = np.array_split(self._block_partition_refs, num_splits)
block_partition_meta_refs = np.array_split(
self._block_partition_meta_refs, num_splits
)
cached_metadata = np.array_split(self._cached_metadata, num_splits)
output = []
for t, b, m, c in zip(
tasks, block_partition_refs, block_partition_meta_refs, cached_metadata
):
output.append(
LazyBlockList(
t.tolist(),
b.tolist(),
m.tolist(),
c.tolist(),
owned_by_consumer=self._owned_by_consumer,
)
)
return output
# Note: does not force execution prior to splitting.
def split_by_bytes(self, bytes_per_split: int) -> List["BlockList"]:
output = []
cur_tasks, cur_blocks, cur_blocks_meta, cur_cached_meta = [], [], [], []
cur_size = 0
for t, b, bm, c in zip(
self._tasks,
self._block_partition_refs,
self._block_partition_meta_refs,
self._cached_metadata,
):
m = t.get_metadata()
if m.size_bytes is None:
raise RuntimeError(
"Block has unknown size, cannot use split_by_bytes()"
)
size = m.size_bytes
if cur_blocks and cur_size + size > bytes_per_split:
output.append(
LazyBlockList(
cur_tasks,
cur_blocks,
cur_blocks_meta,
cur_cached_meta,
owned_by_consumer=self._owned_by_consumer,
),
)
cur_tasks, cur_blocks, cur_blocks_meta, cur_cached_meta = [], [], [], []
cur_size = 0
cur_tasks.append(t)
cur_blocks.append(b)
cur_blocks_meta.append(bm)
cur_cached_meta.append(c)
cur_size += size
if cur_blocks:
output.append(
LazyBlockList(
cur_tasks,
cur_blocks,
cur_blocks_meta,
cur_cached_meta,
owned_by_consumer=self._owned_by_consumer,
)
)
return output
def truncate_by_rows(self, limit: int) -> "LazyBlockList":
"""Truncate the block list to the minimum number of blocks that contains at
least limit rows.
If the number of rows is not available, it will be treated as a 0-row block and
will be included in the truncated output.
"""
self._check_if_cleared()
out_tasks, out_blocks, out_blocks_meta, out_cached_meta = [], [], [], []
out_num_rows = 0
for t, b, bm, c in zip(
self._tasks,
self._block_partition_refs,
self._block_partition_meta_refs,
self._cached_metadata,
):
m = t.get_metadata()
num_rows = m.num_rows
if num_rows is None:
num_rows = 0
out_tasks.append(t)
out_blocks.append(b)
out_blocks_meta.append(bm)
out_cached_meta.append(c)
out_num_rows += num_rows
if out_num_rows >= limit:
break
return LazyBlockList(
out_tasks,
out_blocks,
out_blocks_meta,
out_cached_meta,
owned_by_consumer=self._owned_by_consumer,
)
# Note: does not force execution prior to division.
def divide(self, part_idx: int) -> ("LazyBlockList", "LazyBlockList"):
left = LazyBlockList(
self._tasks[:part_idx],
self._block_partition_refs[:part_idx],
self._block_partition_meta_refs[:part_idx],
self._cached_metadata[:part_idx],
owned_by_consumer=self._owned_by_consumer,
)
right = LazyBlockList(
self._tasks[part_idx:],
self._block_partition_refs[part_idx:],
self._block_partition_meta_refs[part_idx:],
self._cached_metadata[part_idx:],
owned_by_consumer=self._owned_by_consumer,
)
return left, right
def get_blocks(self) -> List[ObjectRef[Block]]:
"""Bulk version of iter_blocks().
Prefer calling this instead of the iter form for performance if you
don't need lazy evaluation.
"""
blocks, _ = self._get_blocks_with_metadata()
return blocks
def get_blocks_with_metadata(self) -> List[Tuple[ObjectRef[Block], BlockMetadata]]:
"""Bulk version of iter_blocks_with_metadata().
Prefer calling this instead of the iter form for performance if you
don't need lazy evaluation.
"""
blocks, metadata = self._get_blocks_with_metadata()
return list(zip(blocks, metadata))
def _get_blocks_with_metadata(
self,
) -> Tuple[List[ObjectRef[Block]], List[BlockMetadata]]:
"""Get all underlying block futures and concrete metadata.
This will block on the completion of the underlying read tasks and will fetch
all block metadata outputted by those tasks.
"""
context = DatasetContext.get_current()
block_refs, meta_refs = [], []
for block_ref, meta_ref in self._iter_block_partition_refs():
block_refs.append(block_ref)
meta_refs.append(meta_ref)
if context.block_splitting_enabled:
# If block splitting is enabled, fetch the partitions.
parts = ray.get(block_refs)
block_refs, metadata = [], []
for part in parts:
for block_ref, meta in part:
block_refs.append(block_ref)
metadata.append(meta)
self._cached_metadata = metadata
return block_refs, metadata
if all(meta is not None for meta in self._cached_metadata):
# Short-circuit on cached metadata.
return block_refs, self._cached_metadata
if not meta_refs:
# Short-circuit on empty set of block partitions.
assert not block_refs, block_refs
return [], []
read_progress_bar = ProgressBar("Read progress", total=len(meta_refs))
# Fetch the metadata in bulk.
# Handle duplicates (e.g. due to unioning the same dataset).
unique_meta_refs = set(meta_refs)
metadata = read_progress_bar.fetch_until_complete(list(unique_meta_refs))
ref_to_data = {
meta_ref: data for meta_ref, data in zip(unique_meta_refs, metadata)
}
metadata = [ref_to_data[meta_ref] for meta_ref in meta_refs]
self._cached_metadata = metadata
return block_refs, metadata
def compute_to_blocklist(self) -> BlockList:
"""Launch all tasks and return a concrete BlockList."""
blocks, metadata = self._get_blocks_with_metadata()
return BlockList(blocks, metadata, owned_by_consumer=self._owned_by_consumer)
def compute_first_block(self):
"""Kick off computation for the first block in the list.
This is useful if looking to support rapid lightweight interaction with a small
amount of the dataset.
"""
if self._tasks:
self._get_or_compute(0)
def ensure_metadata_for_first_block(self) -> Optional[BlockMetadata]:
"""Ensure that the metadata is fetched and set for the first block.
This will only block execution in order to fetch the post-read metadata for the
first block if the pre-read metadata for the first block has no schema.
Returns:
None if the block list is empty, the metadata for the first block otherwise.
"""
if not self._tasks:
return None
metadata = self._tasks[0].get_metadata()
if metadata.schema is not None:
# If pre-read schema is not null, we consider it to be "good enough" and use
# it.
return metadata
# Otherwise, we trigger computation (if needed), wait until the task completes,
# and fetch the block partition metadata.
try:
_, metadata_ref = next(self._iter_block_partition_refs())
except (StopIteration, ValueError):
# Dataset is empty (no blocks) or was manually cleared.
pass
else:
# This blocks until the underlying read task is finished.
metadata = ray.get(metadata_ref)
self._cached_metadata[0] = metadata
return metadata
def iter_blocks(self) -> Iterator[ObjectRef[Block]]:
"""Iterate over the blocks of this block list.
This blocks on the execution of the tasks generating block outputs.
The length of this iterator is not known until execution.
"""
self._check_if_cleared()
outer = self
class Iter:
def __init__(self):
self._base_iter = outer.iter_blocks_with_metadata()
def __iter__(self):
return self
def __next__(self):
ref, meta = next(self._base_iter)
assert isinstance(ref, ray.ObjectRef), (ref, meta)
return ref
return Iter()
def iter_blocks_with_metadata(
self,
block_for_metadata: bool = False,
) -> Iterator[Tuple[ObjectRef[Block], BlockMetadata]]:
"""Iterate over the blocks along with their metadata.
Note that, if block_for_metadata is False (default), this iterator returns
pre-read metadata from the ReadTasks given to this LazyBlockList so it doesn't
have to block on the execution of the read tasks. Therefore, the metadata may be
under-specified, e.g. missing schema or the number of rows. If fully-specified
block metadata is required, pass block_for_metadata=True.
The length of this iterator is not known until execution.
Args:
block_for_metadata: Whether we should block on the execution of read tasks
in order to obtain fully-specified block metadata.
Returns:
An iterator of block references and the corresponding block metadata.
"""
context = DatasetContext.get_current()
outer = self
class Iter:
def __init__(self):
self._base_iter = outer._iter_block_partition_refs()
self._pos = -1
self._buffer = []
def __iter__(self):
return self
def __next__(self):
while not self._buffer:
self._pos += 1
if context.block_splitting_enabled:
part_ref, _ = next(self._base_iter)
partition = ray.get(part_ref)
else:
block_ref, metadata_ref = next(self._base_iter)
if block_for_metadata:
# This blocks until the read task completes, returning
# fully-specified block metadata.
metadata = ray.get(metadata_ref)
else:
# This does not block, returning (possibly under-specified)
# pre-read block metadata.
metadata = outer._tasks[self._pos].get_metadata()
partition = [(block_ref, metadata)]
for block_ref, metadata in partition:
self._buffer.append((block_ref, metadata))
return self._buffer.pop(0)
return Iter()
def randomize_block_order(self, seed: Optional[int] = None) -> "LazyBlockList":
"""Randomizes the order of the blocks.
Args:
seed: Fix the random seed to use, otherwise one will be chosen
based on system randomness.
"""
import random
if seed is not None:
random.seed(seed)
zipped = list(
zip(
self._tasks,
self._block_partition_refs,
self._block_partition_meta_refs,
self._cached_metadata,
)
)
random.shuffle(zipped)
tasks, block_partition_refs, block_partition_meta_refs, cached_metadata = map(
list, zip(*zipped)
)
return LazyBlockList(
tasks,
block_partition_refs=block_partition_refs,
block_partition_meta_refs=block_partition_meta_refs,
cached_metadata=cached_metadata,
ray_remote_args=self._remote_args.copy(),
owned_by_consumer=self._owned_by_consumer,
stats_uuid=self._stats_uuid,
)
def _iter_block_partition_refs(
self,
) -> Iterator[
Tuple[ObjectRef[MaybeBlockPartition], ObjectRef[BlockPartitionMetadata]]
]:
"""Iterate over the block futures and their corresponding metadata futures.
This does NOT block on the execution of each submitted task.
"""
outer = self
class Iter:
def __init__(self):
self._pos = -1
def __iter__(self):
return self
def __next__(self):
self._pos += 1
if self._pos < len(outer._tasks):
return outer._get_or_compute(self._pos)
raise StopIteration
return Iter()
def _get_or_compute(
self,
i: int,
) -> Tuple[ObjectRef[MaybeBlockPartition], ObjectRef[BlockPartitionMetadata]]:
assert i < len(self._tasks), i
# Check if we need to compute more block_partition_refs.
if not self._block_partition_refs[i]:
# Exponentially increase the number computed per batch.
for j in range(max(i + 1, i * 2)):
if j >= len(self._block_partition_refs):
break
if not self._block_partition_refs[j]:
(
self._block_partition_refs[j],
self._block_partition_meta_refs[j],
) = self._submit_task(j)
assert self._block_partition_refs[i], self._block_partition_refs
assert self._block_partition_meta_refs[i], self._block_partition_meta_refs
return self._block_partition_refs[i], self._block_partition_meta_refs[i]
def _submit_task(
self, task_idx: int
) -> Tuple[ObjectRef[MaybeBlockPartition], ObjectRef[BlockPartitionMetadata]]:
"""Submit the task with index task_idx."""
stats_actor = _get_or_create_stats_actor()
if not self._execution_started:
stats_actor.record_start.remote(self._stats_uuid)
self._execution_started = True
task = self._tasks[task_idx]
return (
cached_remote_fn(_execute_read_task)
.options(num_returns=2, **self._remote_args)
.remote(
i=task_idx,
task=task,
context=DatasetContext.get_current(),
stats_uuid=self._stats_uuid,
stats_actor=stats_actor,
)
)
def _num_computed(self) -> int:
i = 0
for b in self._block_partition_refs:
if b is not None:
i += 1
return i
def _execute_read_task(
i: int,
task: ReadTask,
context: DatasetContext,
stats_uuid: str,
stats_actor: ray.actor.ActorHandle,
) -> Tuple[MaybeBlockPartition, BlockPartitionMetadata]:
DatasetContext._set_current(context)
stats = BlockExecStats.builder()
# Execute the task.
block = task()
metadata = task.get_metadata()
if context.block_splitting_enabled:
metadata.exec_stats = stats.build()
else:
metadata = BlockAccessor.for_block(block).get_metadata(
input_files=metadata.input_files, exec_stats=stats.build()
)
stats_actor.record_task.remote(stats_uuid, i, metadata)
return block, metadata