Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
sarus_data_spec / sarus_data_spec / manager / worker_manager.py
Size: Mime:
import os
import typing as t

from sarus_data_spec import typing as st
from sarus_data_spec.manager.computations.local.arrow import ToArrowComputation
from sarus_data_spec.manager.computations.local.cache_scalar import (
    CacheScalarComputation,
)
from sarus_data_spec.manager.computations.local.parquet import (
    ToParquetComputation,
)
from sarus_data_spec.manager.computations.local.schema import SchemaComputation
from sarus_data_spec.manager.computations.local.sql import SQLComputation
from sarus_data_spec.manager.computations.local.to_sql import ToSQLComputation
from sarus_data_spec.manager.computations.local.push_sql import (
    PushSQLComputation,
)
from sarus_data_spec.manager.computations.local.value import ValueComputation
from sarus_data_spec.manager.ops.foreign_keys import fk_visitor
from sarus_data_spec.manager.ops.primary_keys import pk_visitor
from sarus_data_spec.manager.ops.processor.routing import TransformedDataset
from sarus_data_spec.manager.ops.source.routing import source_dataset_schema
from sarus_data_spec.manager.private_manager import PrivateManager
from sarus_data_spec.storage.typing import Storage
import sarus_data_spec.protobuf as sp
import sarus_data_spec.storage.typing as storage_typing

try:
    from sarus_data_spec.manager.computations.local.bounds import (
        BoundsComputation,
    )
    from sarus_data_spec.manager.computations.local.links import (
        LinksComputation,
    )
    from sarus_data_spec.manager.computations.local.marginals import (
        MarginalsComputation,
    )
    from sarus_data_spec.manager.computations.local.multiplicity import (
        MultiplicityComputation,
    )
    from sarus_data_spec.manager.computations.local.size import SizeComputation
except Exception:
    pass


computation_timeout = os.environ.get("WORKER_COMPUTATION_TIMEOUT", default=600)
computation_max_delay = os.environ.get(
    "WORKER_COMPUTATION_MAX_DELAY", default=10
)


class WorkerManager(PrivateManager):
    """Manager that always executes computations
    in its process"""

    def __init__(
        self, storage: storage_typing.Storage, protobuf: sp.Manager
    ) -> None:
        super().__init__(storage, protobuf)
        self.schema_computation = SchemaComputation(self)
        self.to_arrow_computation = ToArrowComputation(
            self, parquet_computation=ToParquetComputation(self)
        )
        self.to_parquet_computation = ToParquetComputation(self)
        self.cache_scalar_computation = CacheScalarComputation(self)
        self.value_computation = ValueComputation(
            self, CacheScalarComputation(self)
        )
        try:
            self._sql_computation = SQLComputation(
                self, ToSQLComputation(self)
            )
            self.to_sql_computation = ToSQLComputation(self)
            self.push_sql_computation = PushSQLComputation(self)
            self.multiplicity_computation = MultiplicityComputation(self)
            self.size_computation = SizeComputation(self)
            self.bounds_computation = BoundsComputation(self)
            self.marginals_computation = MarginalsComputation(self)
            self.links_computation = LinksComputation(self)

        except Exception:
            pass

    async def async_schema_op(self, dataset: st.Dataset) -> st.Schema:
        if dataset.is_transformed():
            return await TransformedDataset(dataset).schema()
        return await source_dataset_schema(dataset=dataset)

    async def async_foreign_keys(
        self, dataset: st.Dataset
    ) -> t.Dict[st.Path, st.Path]:
        """Gets foreign keys from the schema"""
        schema = await self.async_schema(dataset)
        return fk_visitor(schema.type())

    async def async_primary_keys(self, dataset: st.Dataset) -> t.List[st.Path]:
        """Gets primary keys from the schema"""
        return pk_visitor((await self.async_schema(dataset)).type())

    async def async_links(self, dataset: st.Dataset) -> t.Any:
        return await self.links_computation.task_result(dataset)

    def computation_timeout(self, dataspec: st.DataSpec) -> int:
        if dataspec.is_transformed() and dataspec.transform().name() in [
            "User Settings",
            "automatic_user_settings",
            "Protect",
        ]:
            return 5 * int(computation_timeout)
        return int(computation_timeout)

    def computation_max_delay(self, dataspec: st.DataSpec) -> int:
        return int(computation_max_delay)

    def sql_pushing_schema_prefix(self, dataset: st.Dataset) -> str:
        return "st51_"


def manager(storage: Storage, **kwargs: str) -> WorkerManager:
    properties = {"type": "worker_manager"}
    properties.update(kwargs)
    return WorkerManager(storage, sp.Manager(properties=properties))