Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import logging
import time
import traceback
import typing as t

from sarus_data_spec import typing as st
from sarus_data_spec.constants import LINKS_TASK
from sarus_data_spec.links import Links
from sarus_data_spec.manager.cache_utils import lru_caching
from sarus_data_spec.manager.computations.local.base import LocalComputation
from sarus_data_spec.manager.ops.source.marginals import get_links
from sarus_data_spec.status import (
    DataSpecErrorStatus,
    error,
    processing,
    ready,
)

logger = logging.getLogger(__name__)


class LinksComputation(LocalComputation[st.Links]):
    """Class responsible to compute links"""

    task_name = LINKS_TASK

    async def prepare(self, dataspec: st.DataSpec) -> None:
        dataspec = t.cast(st.Dataset, dataspec)
        processing(
            dataspec=dataspec,
            manager=self.computing_manager(),
            task=LINKS_TASK,
        )

        try:
            logger.info(f"STARTED LINKS {dataspec.uuid()}")
            start = time.perf_counter()
            links = await get_links(dataspec)
        except DataSpecErrorStatus as exception:
            error(
                dataspec=dataspec,
                manager=self.computing_manager(),
                task=self.task_name,
                properties={
                    "message": traceback.format_exc(),
                    "relaunch": str(exception.relaunch),
                },
            )
            raise
        except Exception:
            error(
                dataspec=dataspec,
                manager=self.computing_manager(),
                task=self.task_name,
                properties={
                    "message": traceback.format_exc(),
                    "relaunch": str(False),
                },
            )
            raise DataSpecErrorStatus((False, traceback.format_exc()))
        else:
            end = time.perf_counter()
            logger.info(f"FINISHED LINKS {dataspec.uuid()} ({end-start:.2f}s)")
            ready(
                dataspec=dataspec,
                manager=self.computing_manager(),
                task=LINKS_TASK,
                properties={"uuid": links.uuid()},
            )

    async def result_from_stage_properties(
        self,
        dataspec: st.DataSpec,
        properties: t.Mapping[str, str],
        **kwargs: t.Any,
    ) -> st.Links:
        return t.cast(
            Links,
            dataspec.storage().referrable(properties["uuid"]),
        )

    @lru_caching("computation", use_first_arg=True)
    async def task_result(
        self, dataspec: st.DataSpec, **kwargs: t.Any
    ) -> st.Links:
        return await super().task_result(dataspec, **kwargs)