Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import os
import typing as t
from sarus_data_spec import typing as st
from sarus_data_spec.manager.computations.remote.arrow import (
ToArrowComputation,
)
from sarus_data_spec.manager.computations.remote.bounds import (
BoundsComputation,
)
from sarus_data_spec.manager.computations.remote.cache_scalar import (
CacheScalarComputation,
)
from sarus_data_spec.manager.computations.remote.links import LinksComputation
from sarus_data_spec.manager.computations.remote.marginals import (
MarginalsComputation,
)
from sarus_data_spec.manager.computations.remote.multiplicity import (
MultiplicityComputation,
)
from sarus_data_spec.manager.computations.remote.parquet import (
ToParquetComputation,
)
from sarus_data_spec.manager.computations.remote.schema import (
SchemaComputation,
)
from sarus_data_spec.manager.computations.remote.size import SizeComputation
from sarus_data_spec.manager.computations.remote.sql import SQLComputation
from sarus_data_spec.manager.computations.remote.to_sql import ToSQLComputation
from sarus_data_spec.manager.computations.remote.push_sql import (
PushSQLComputation,
)
from sarus_data_spec.manager.computations.remote.value import ValueComputation
from sarus_data_spec.manager.private_manager import PrivateManager
from sarus_data_spec.storage.typing import Storage
import sarus_data_spec.protobuf as sp
import sarus_data_spec.status as stt
import sarus_data_spec.storage.typing as storage_typing
computation_timeout = os.environ.get("API_COMPUTATION_TIMEOUT", default=600)
computation_max_delay = os.environ.get("API_COMPUTATION_MAX_DELAY", default=10)
class ApiManager(PrivateManager):
"""Manager that always delegates computations
to a WorkerManager, that can be called via
the delegated_manager method"""
def __init__(
self,
storage: storage_typing.Storage,
protobuf: sp.Manager,
computing_manager: PrivateManager,
) -> None:
super().__init__(storage, protobuf)
self._computing_manager = computing_manager
self.schema_computation = SchemaComputation(self._computing_manager)
self.to_arrow_computation = ToArrowComputation(
self._computing_manager,
ToParquetComputation(self._computing_manager),
)
self.to_parquet_computation = ToParquetComputation(
self._computing_manager
)
self.cache_scalar_computation = CacheScalarComputation(
self._computing_manager
)
self.value_computation = ValueComputation(
self._computing_manager,
CacheScalarComputation(self._computing_manager),
)
self.to_sql_computation = ToSQLComputation(self._computing_manager)
self.push_sql_computation = PushSQLComputation(self._computing_manager)
self._sql_computation = SQLComputation(
self._computing_manager, ToSQLComputation(self._computing_manager)
)
self.multiplicity_computation = MultiplicityComputation(
self._computing_manager
)
self.size_computation = SizeComputation(self._computing_manager)
self.bounds_computation = BoundsComputation(self._computing_manager)
self.marginals_computation = MarginalsComputation(
self._computing_manager
)
self.links_computation = LinksComputation(self._computing_manager)
def computing_manager(self) -> PrivateManager:
"""Returns the manager to whom tasks computations
are delegated"""
return self._computing_manager
def status(
self, dataspec: st.DataSpec, task_name: t.Optional[str] = None
) -> t.Optional[st.Status]:
"""Reads the delegate manager's status and update the API status if
needed.
Returns the API manager's status.
"""
return stt.last_status(
dataspec, manager=self.computing_manager(), task=task_name
)
def computation_timeout(self, dataspec: st.DataSpec) -> int:
if dataspec.is_transformed() and dataspec.transform().name() in [
"User Settings",
"automatic_user_settings",
"Protect",
"Synthetic data",
]:
return 5 * int(computation_timeout)
return int(computation_timeout)
def computation_max_delay(self, dataspec: st.DataSpec) -> int:
return int(computation_max_delay)
def manager(
storage: Storage, computing_manager: PrivateManager, **kwargs: str
) -> ApiManager:
properties = {"type": "api_manager"}
properties.update(kwargs)
return ApiManager(
storage, sp.Manager(properties=properties), computing_manager
)