Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import typing as t
from sarus_data_spec.context.base import Base
from sarus_data_spec.manager.typing import Manager
from sarus_data_spec.protobuf import type_name
from sarus_data_spec.storage.sql import Storage, sqlite
import sarus_data_spec.manager.base as smb
import sarus_data_spec.manager.worker_manager as swm
import sarus_data_spec.protobuf as sp
class WorkerContext(Base):
"""A factory class with all the config lazily provided"""
def __init__(self, seed: int = 1234) -> None:
super().__init__(seed=seed)
engine = sqlite(":memory:")
self._storage = Storage(engine, self.factory())
self._manager = swm.manager(self.storage())
self.factory().register(
type_name(sp.Manager),
lambda protobuf: smb.Base(
self.storage(), t.cast(sp.Manager, protobuf)
),
)
def storage(self) -> Storage:
return self._storage
def manager(self) -> Manager:
return self._manager