Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import os
import typing as t
from sarus_data_spec import typing as st
from sarus_data_spec.manager.computations.local.arrow import ToArrowComputation
from sarus_data_spec.manager.computations.local.cache_scalar import (
CacheScalarComputation,
)
from sarus_data_spec.manager.computations.local.parquet import (
ToParquetComputation,
)
from sarus_data_spec.manager.computations.local.schema import SchemaComputation
from sarus_data_spec.manager.computations.local.sql import SQLComputation
from sarus_data_spec.manager.computations.local.to_sql import ToSQLComputation
from sarus_data_spec.manager.computations.local.push_sql import (
PushSQLComputation,
)
from sarus_data_spec.manager.computations.local.value import ValueComputation
from sarus_data_spec.manager.ops.foreign_keys import fk_visitor
from sarus_data_spec.manager.ops.primary_keys import pk_visitor
from sarus_data_spec.manager.ops.processor.routing import TransformedDataset
from sarus_data_spec.manager.ops.source.routing import source_dataset_schema
from sarus_data_spec.manager.private_manager import PrivateManager
from sarus_data_spec.storage.typing import Storage
import sarus_data_spec.protobuf as sp
import sarus_data_spec.storage.typing as storage_typing
try:
from sarus_data_spec.manager.computations.local.bounds import (
BoundsComputation,
)
from sarus_data_spec.manager.computations.local.links import (
LinksComputation,
)
from sarus_data_spec.manager.computations.local.marginals import (
MarginalsComputation,
)
from sarus_data_spec.manager.computations.local.multiplicity import (
MultiplicityComputation,
)
from sarus_data_spec.manager.computations.local.size import SizeComputation
except Exception:
pass
computation_timeout = os.environ.get("WORKER_COMPUTATION_TIMEOUT", default=600)
computation_max_delay = os.environ.get(
"WORKER_COMPUTATION_MAX_DELAY", default=10
)
class WorkerManager(PrivateManager):
"""Manager that always executes computations
in its process"""
def __init__(
self, storage: storage_typing.Storage, protobuf: sp.Manager
) -> None:
super().__init__(storage, protobuf)
self.schema_computation = SchemaComputation(self)
self.to_arrow_computation = ToArrowComputation(
self, parquet_computation=ToParquetComputation(self)
)
self.to_parquet_computation = ToParquetComputation(self)
self.cache_scalar_computation = CacheScalarComputation(self)
self.value_computation = ValueComputation(
self, CacheScalarComputation(self)
)
try:
self._sql_computation = SQLComputation(
self, ToSQLComputation(self)
)
self.to_sql_computation = ToSQLComputation(self)
self.push_sql_computation = PushSQLComputation(self)
self.multiplicity_computation = MultiplicityComputation(self)
self.size_computation = SizeComputation(self)
self.bounds_computation = BoundsComputation(self)
self.marginals_computation = MarginalsComputation(self)
self.links_computation = LinksComputation(self)
except Exception:
pass
async def async_schema_op(self, dataset: st.Dataset) -> st.Schema:
if dataset.is_transformed():
return await TransformedDataset(dataset).schema()
return await source_dataset_schema(dataset=dataset)
async def async_foreign_keys(
self, dataset: st.Dataset
) -> t.Dict[st.Path, st.Path]:
"""Gets foreign keys from the schema"""
schema = await self.async_schema(dataset)
return fk_visitor(schema.type())
async def async_primary_keys(self, dataset: st.Dataset) -> t.List[st.Path]:
"""Gets primary keys from the schema"""
return pk_visitor((await self.async_schema(dataset)).type())
async def async_links(self, dataset: st.Dataset) -> t.Any:
return await self.links_computation.task_result(dataset)
def computation_timeout(self, dataspec: st.DataSpec) -> int:
if dataspec.is_transformed() and dataspec.transform().name() in [
"User Settings",
"automatic_user_settings",
"Protect",
]:
return 5 * int(computation_timeout)
return int(computation_timeout)
def computation_max_delay(self, dataspec: st.DataSpec) -> int:
return int(computation_max_delay)
def sql_pushing_schema_prefix(self, dataset: st.Dataset) -> str:
return "st51_"
def manager(storage: Storage, **kwargs: str) -> WorkerManager:
properties = {"type": "worker_manager"}
properties.update(kwargs)
return WorkerManager(storage, sp.Manager(properties=properties))