Repository URL to install this package:
|
Version:
4.5.4.dev1 ▾
|
import tempfile
import typing as t
from sarus_data_spec.context.base import Base
from sarus_data_spec.manager.typing import Manager
from sarus_data_spec.protobuf import type_name
from sarus_data_spec.storage.sql import Storage, sqlite
import sarus_data_spec.manager.api_manager as sam
import sarus_data_spec.manager.base as smb
import sarus_data_spec.manager.worker_manager as worker_man
import sarus_data_spec.protobuf as sp
class ApiContext(Base):
"""A factory class with all the config lazily provided"""
def __init__(self, seed: int = 1234) -> None:
super().__init__(seed=seed)
# TODO: make it better
engine = sqlite(tempfile.gettempdir() + "/temp.db")
self._storage = Storage(engine, self.factory())
self._manager = sam.manager(
self.storage(), worker_man.manager(self.storage())
)
self.factory().register(
type_name(sp.Manager),
lambda protobuf: smb.Base(
self.storage(), t.cast(sp.Manager, protobuf)
),
)
def storage(self) -> Storage:
return self._storage
def manager(self) -> Manager:
return self._manager