Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import logging
import time
import traceback
import typing as t
from sarus_data_spec import typing as st
from sarus_data_spec.constants import LINKS_TASK
from sarus_data_spec.links import Links
from sarus_data_spec.manager.cache_utils import lru_caching
from sarus_data_spec.manager.computations.local.base import LocalComputation
from sarus_data_spec.manager.ops.source.marginals import get_links
from sarus_data_spec.status import (
DataSpecErrorStatus,
error,
processing,
ready,
)
logger = logging.getLogger(__name__)
class LinksComputation(LocalComputation[st.Links]):
"""Class responsible to compute links"""
task_name = LINKS_TASK
async def prepare(self, dataspec: st.DataSpec) -> None:
dataspec = t.cast(st.Dataset, dataspec)
processing(
dataspec=dataspec,
manager=self.computing_manager(),
task=LINKS_TASK,
)
try:
logger.info(f"STARTED LINKS {dataspec.uuid()}")
start = time.perf_counter()
links = await get_links(dataspec)
except DataSpecErrorStatus as exception:
error(
dataspec=dataspec,
manager=self.computing_manager(),
task=self.task_name,
properties={
"message": traceback.format_exc(),
"relaunch": str(exception.relaunch),
},
)
raise
except Exception:
error(
dataspec=dataspec,
manager=self.computing_manager(),
task=self.task_name,
properties={
"message": traceback.format_exc(),
"relaunch": str(False),
},
)
raise DataSpecErrorStatus((False, traceback.format_exc()))
else:
end = time.perf_counter()
logger.info(f"FINISHED LINKS {dataspec.uuid()} ({end-start:.2f}s)")
ready(
dataspec=dataspec,
manager=self.computing_manager(),
task=LINKS_TASK,
properties={"uuid": links.uuid()},
)
async def result_from_stage_properties(
self,
dataspec: st.DataSpec,
properties: t.Mapping[str, str],
**kwargs: t.Any,
) -> st.Links:
return t.cast(
Links,
dataspec.storage().referrable(properties["uuid"]),
)
@lru_caching("computation", use_first_arg=True)
async def task_result(
self, dataspec: st.DataSpec, **kwargs: t.Any
) -> st.Links:
return await super().task_result(dataspec, **kwargs)