Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
faust-streaming / faust / agents / models.py
Size: Mime:
"""Models used by agents internally."""
from typing import Any

from faust.models import Record
from faust.types import ModelT

__all__ = ["ReqRepRequest", "ReqRepResponse"]


class ReqRepRequest(
    Record,
    serializer="json",
    namespace="@ReqRepRequest",  # internal namespace
    # any stream should allow this type
    # to wrap other values.
    polymorphic_fields=True,
):
    """Value wrapped in a Request-Reply request."""

    # agent.ask(value) wraps the value in this record
    # so that the receiving agent knows where to send the reply.

    value: Any
    reply_to: str
    correlation_id: str


class ModelReqRepRequest(ReqRepRequest):
    """Request-Reply request where value is a model."""

    value: ModelT


class ReqRepResponse(Record, serializer="json", namespace="@ReqRepResponse"):
    """Request-Reply response."""

    key: Any
    value: Any
    correlation_id: str


class ModelReqRepResponse(ReqRepResponse):
    value: Any