Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import typing as t

from sarus_data_spec.context.base import Base
from sarus_data_spec.manager.typing import Manager
from sarus_data_spec.protobuf import type_name
from sarus_data_spec.storage.sql import Storage, sqlite
import sarus_data_spec.manager.base as smb
import sarus_data_spec.manager.worker_manager as swm
import sarus_data_spec.protobuf as sp


class WorkerContext(Base):
    """A factory class with all the config lazily provided"""

    def __init__(self, seed: int = 1234) -> None:
        super().__init__(seed=seed)
        engine = sqlite(":memory:")
        self._storage = Storage(engine, self.factory())
        self._manager = swm.manager(self.storage())
        self.factory().register(
            type_name(sp.Manager),
            lambda protobuf: smb.Base(
                self.storage(), t.cast(sp.Manager, protobuf)
            ),
        )

    def storage(self) -> Storage:
        return self._storage

    def manager(self) -> Manager:
        return self._manager