Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
sarus_data_spec / sarus_data_spec / manager / api_manager.py
Size: Mime:
import os
import typing as t

from sarus_data_spec import typing as st
from sarus_data_spec.manager.computations.remote.arrow import (
    ToArrowComputation,
)
from sarus_data_spec.manager.computations.remote.bounds import (
    BoundsComputation,
)
from sarus_data_spec.manager.computations.remote.cache_scalar import (
    CacheScalarComputation,
)
from sarus_data_spec.manager.computations.remote.links import LinksComputation
from sarus_data_spec.manager.computations.remote.marginals import (
    MarginalsComputation,
)
from sarus_data_spec.manager.computations.remote.multiplicity import (
    MultiplicityComputation,
)
from sarus_data_spec.manager.computations.remote.parquet import (
    ToParquetComputation,
)
from sarus_data_spec.manager.computations.remote.schema import (
    SchemaComputation,
)
from sarus_data_spec.manager.computations.remote.size import SizeComputation
from sarus_data_spec.manager.computations.remote.sql import SQLComputation
from sarus_data_spec.manager.computations.remote.to_sql import ToSQLComputation
from sarus_data_spec.manager.computations.remote.push_sql import (
    PushSQLComputation,
)
from sarus_data_spec.manager.computations.remote.value import ValueComputation
from sarus_data_spec.manager.private_manager import PrivateManager
from sarus_data_spec.storage.typing import Storage
import sarus_data_spec.protobuf as sp
import sarus_data_spec.status as stt
import sarus_data_spec.storage.typing as storage_typing

computation_timeout = os.environ.get("API_COMPUTATION_TIMEOUT", default=600)
computation_max_delay = os.environ.get("API_COMPUTATION_MAX_DELAY", default=10)


class ApiManager(PrivateManager):
    """Manager that always delegates computations
    to a WorkerManager, that can be called via
    the delegated_manager method"""

    def __init__(
        self,
        storage: storage_typing.Storage,
        protobuf: sp.Manager,
        computing_manager: PrivateManager,
    ) -> None:
        super().__init__(storage, protobuf)
        self._computing_manager = computing_manager
        self.schema_computation = SchemaComputation(self._computing_manager)
        self.to_arrow_computation = ToArrowComputation(
            self._computing_manager,
            ToParquetComputation(self._computing_manager),
        )
        self.to_parquet_computation = ToParquetComputation(
            self._computing_manager
        )
        self.cache_scalar_computation = CacheScalarComputation(
            self._computing_manager
        )
        self.value_computation = ValueComputation(
            self._computing_manager,
            CacheScalarComputation(self._computing_manager),
        )
        self.to_sql_computation = ToSQLComputation(self._computing_manager)
        self.push_sql_computation = PushSQLComputation(self._computing_manager)
        self._sql_computation = SQLComputation(
            self._computing_manager, ToSQLComputation(self._computing_manager)
        )
        self.multiplicity_computation = MultiplicityComputation(
            self._computing_manager
        )
        self.size_computation = SizeComputation(self._computing_manager)
        self.bounds_computation = BoundsComputation(self._computing_manager)
        self.marginals_computation = MarginalsComputation(
            self._computing_manager
        )
        self.links_computation = LinksComputation(self._computing_manager)

    def computing_manager(self) -> PrivateManager:
        """Returns the manager to whom tasks computations
        are delegated"""
        return self._computing_manager

    def status(
        self, dataspec: st.DataSpec, task_name: t.Optional[str] = None
    ) -> t.Optional[st.Status]:
        """Reads the delegate manager's status and update the API status if
        needed.

        Returns the API manager's status.
        """
        return stt.last_status(
            dataspec, manager=self.computing_manager(), task=task_name
        )

    def computation_timeout(self, dataspec: st.DataSpec) -> int:
        if dataspec.is_transformed() and dataspec.transform().name() in [
            "User Settings",
            "automatic_user_settings",
            "Protect",
            "Synthetic data",
        ]:
            return 5 * int(computation_timeout)
        return int(computation_timeout)

    def computation_max_delay(self, dataspec: st.DataSpec) -> int:
        return int(computation_max_delay)


def manager(
    storage: Storage, computing_manager: PrivateManager, **kwargs: str
) -> ApiManager:
    properties = {"type": "api_manager"}
    properties.update(kwargs)
    return ApiManager(
        storage, sp.Manager(properties=properties), computing_manager
    )