Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import logging
import os
import time
import traceback
import typing as t
import pyarrow as pa
import pyarrow.parquet as pq
from sarus_data_spec import typing as st
from sarus_data_spec.constants import CACHE_PATH, TO_PARQUET_TASK
from sarus_data_spec.dataset import Dataset
from sarus_data_spec.status import DataSpecErrorStatus, error, ready
import sarus_data_spec.manager.computations.local.base as worker_comp
logger = logging.getLogger(__name__)
BATCH_SIZE = 10000
class ToParquetComputation(worker_comp.LocalComputation[str]):
"""Class responsible for handling the caching
in parquet of a dataset. It wraps a ToArrowComputation
to get the iterator."""
task_name = TO_PARQUET_TASK
async def prepare(self, dataspec: st.DataSpec) -> None:
logger.info(f"STARTING TO_PARQUET {dataspec.uuid()}")
start = time.perf_counter()
try:
iterator = await self.computing_manager().async_to_arrow_op(
dataset=t.cast(Dataset, dataspec), batch_size=BATCH_SIZE
)
batches = [batch async for batch in iterator]
if len(batches) > 0:
pq.write_table(
table=pa.Table.from_batches(batches),
where=self.cache_path(dataspec=dataspec),
version="2.6",
)
else:
pq.write_table(
table=pa.table([]),
where=self.cache_path(dataspec=dataspec),
version="2.6",
)
except DataSpecErrorStatus as exception:
error(
dataspec=dataspec,
manager=self.computing_manager(),
task=self.task_name,
properties={
"message": traceback.format_exc(),
"relaunch": str(exception.relaunch),
},
)
raise DataSpecErrorStatus(
(exception.relaunch, traceback.format_exc())
)
except Exception:
error(
dataspec=dataspec,
manager=self.computing_manager(),
task=self.task_name,
properties={
"message": traceback.format_exc(),
"relaunch": str(False),
},
)
raise DataSpecErrorStatus((False, traceback.format_exc()))
else:
end = time.perf_counter()
logger.info(
f"FINISHED TO_PARQUET {dataspec.uuid()} ({end-start:.2f}s)"
)
ready(
dataspec=dataspec,
manager=self.computing_manager(),
task=TO_PARQUET_TASK,
properties={CACHE_PATH: self.cache_path(dataspec)},
)
async def result_from_stage_properties(
self,
dataspec: st.DataSpec,
properties: t.Mapping[str, str],
**kwargs: t.Any,
) -> str:
"""Returns the cache_path"""
return properties[CACHE_PATH]
def cache_path(self, dataspec: st.DataSpec) -> str:
"""Returns the path where to cache the dataset"""
return os.path.join(
dataspec.manager().parquet_dir(), f"{dataspec.uuid()}.parquet"
)