Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
Size: Mime:
import tempfile
import typing as t

from sarus_data_spec.context.base import Base
from sarus_data_spec.manager.typing import Manager
from sarus_data_spec.protobuf import type_name
from sarus_data_spec.storage.sql import Storage, sqlite
import sarus_data_spec.manager.api_manager as sam
import sarus_data_spec.manager.base as smb
import sarus_data_spec.manager.worker_manager as worker_man
import sarus_data_spec.protobuf as sp


class ApiContext(Base):
    """A factory class with all the config lazily provided"""

    def __init__(self, seed: int = 1234) -> None:
        super().__init__(seed=seed)
        # TODO: make it better
        engine = sqlite(tempfile.gettempdir() + "/temp.db")
        self._storage = Storage(engine, self.factory())
        self._manager = sam.manager(
            self.storage(), worker_man.manager(self.storage())
        )
        self.factory().register(
            type_name(sp.Manager),
            lambda protobuf: smb.Base(
                self.storage(), t.cast(sp.Manager, protobuf)
            ),
        )

    def storage(self) -> Storage:
        return self._storage

    def manager(self) -> Manager:
        return self._manager