Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
faust-streaming / faust / types / assignor.py
Size: Mime:
import abc
import typing
from typing import Any, List, MutableMapping, Set

from mode import ServiceT
from yarl import URL

from .tuples import TP

if typing.TYPE_CHECKING:
    from .app import AppT as _AppT
else:

    class _AppT:
        ...  # noqa


__all__ = [
    "TopicToPartitionMap",
    "HostToPartitionMap",
    "PartitionAssignorT",
    "LeaderAssignorT",
]

TopicToPartitionMap = MutableMapping[str, List[int]]
HostToPartitionMap = MutableMapping[str, TopicToPartitionMap]


class PartitionAssignorT(abc.ABC):

    replicas: int
    app: _AppT

    @abc.abstractmethod
    def __init__(self, app: _AppT, replicas: int = 0) -> None:
        ...

    @abc.abstractmethod
    def group_for_topic(self, topic: str) -> int:
        ...

    @abc.abstractmethod
    def assigned_standbys(self) -> Set[TP]:
        ...

    @abc.abstractmethod
    def assigned_actives(self) -> Set[TP]:
        ...

    @abc.abstractmethod
    def is_active(self, tp: TP) -> bool:
        ...

    @abc.abstractmethod
    def is_standby(self, tp: TP) -> bool:
        ...

    @abc.abstractmethod
    def key_store(self, topic: str, key: bytes) -> URL:
        ...

    @abc.abstractmethod
    def table_metadata(self, topic: str) -> HostToPartitionMap:
        ...

    @abc.abstractmethod
    def tables_metadata(self) -> HostToPartitionMap:
        ...


class LeaderAssignorT(ServiceT):

    app: _AppT

    @abc.abstractmethod
    def __init__(self, app: _AppT, **kwargs: Any) -> None:
        ...

    @abc.abstractmethod
    def is_leader(self) -> bool:
        ...