Repository URL to install this package:
|
Version:
0.6.9.post12 ▾
|
"""Attachments - Deprecated module.
Attachments were used before transactions support.
"""
import asyncio
import typing
from collections import defaultdict
from heapq import heappop, heappush
from typing import (
Awaitable,
Iterator,
List,
MutableMapping,
NamedTuple,
Union,
cast,
)
from mode.utils.objects import Unordered, cached_property
from faust.streams import current_event
from faust.types import TP, AppT, ChannelT, CodecArg, RecordMetadata, SchemaT
from faust.types.core import HeadersArg, K, V
from faust.types.tuples import FutureMessage, Message, MessageSentCallback
if typing.TYPE_CHECKING:
from faust.events import Event as _Event
else:
class _Event:
... # noqa
__all__ = ["Attachment", "Attachments"]
class Attachment(NamedTuple):
"""Message attached to offset in source topic.
The message will be published once that offset in the source
topic is committed.
"""
# Tuple used in heapq entry for Attachments._pending
# These are used to delay producing of messages until source offset is
# committed:
#
# @app.agent(source_topic)
# async def process(stream):
# async for value in stream:
# await other_topic.send(value) # does not send here!
#
# sending of the message is attached to the offset of value in the source
# topic, so that when the event is acked, only then do we send the
# message. This gives better consistency: if we reprocess the event
# we don't send messages twice.
#
# Note though: we need Kafka transactions to cover all cases of
# inconsistencies.
offset: int
message: Unordered[FutureMessage]
class Attachments:
"""Attachments manager."""
app: AppT
# Mapping used to attach messages to a source message such that
# only when the source message is acked, only then do we publish
# its attached messages.
#
# The mapping maintains one list for each TopicPartition,
# where the lists are used as heap queues containing tuples
# of ``(source_message_offset, FutureMessage)``.
_pending: MutableMapping[TP, List[Attachment]]
def __init__(self, app: AppT) -> None:
self.app = app
self._pending = defaultdict(list)
@cached_property
def enabled(self) -> bool:
"""Return :const:`True` if attachments are enabled."""
return True
# return self.app.conf.stream_publish_on_commit
async def maybe_put(
self,
channel: Union[ChannelT, str],
key: K = None,
value: V = None,
partition: int = None,
timestamp: float = None,
headers: HeadersArg = None,
schema: SchemaT = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: MessageSentCallback = None,
) -> Awaitable[RecordMetadata]:
"""Associate produced message to current event's source topic offset
"""
# XXX The concept of attaching should be deprecated when we
# have Kafka transaction support (:kip:`KIP-98`).
# This is why the interface related to attaching is private.
event = current_event()
# attach message to current event if there is one.
if event is not None:
return cast(_Event, event).send(
channel=channel,
key=key,
value=value,
partition=partition,
timestamp=timestamp,
headers=headers,
schema=schema,
key_serializer=key_serializer,
value_serializer=value_serializer,
callback=callback,
)
else:
return channel.send_soon(
key=key,
value=value,
partition=partition,
timestamp=timestamp,
headers=headers,
schema=schema,
key_serializer=key_serializer,
value_serializer=value_serializer,
callback=callback,
)
def put(
self,
message: Message,
channel: Union[str, ChannelT],
key: K,
value: V,
partition: int = None,
timestamp: float = None,
headers: HeadersArg = None,
schema: SchemaT = None,
key_serializer: CodecArg = None,
value_serializer: CodecArg = None,
callback: MessageSentCallback = None,
) -> Awaitable[RecordMetadata]:
"""Attach message to source topic offset."""
# This attaches message to be published when source message' is
# acknowledged. To be replaced by transactions in :kip:`KIP-98`.
# get heap queue for this TopicPartition
# items in this list are ``(source_offset, Unordered[FutureMessage])``
# tuples.
buf = self._pending[message.tp]
chan = self.app.topic(channel) if isinstance(channel, str) else channel
fut = chan.send_soon(
key=key,
value=value,
partition=partition,
timestamp=timestamp,
headers=headers,
schema=schema,
key_serializer=key_serializer,
value_serializer=value_serializer,
callback=callback,
)
# Note: Since FutureMessage have members that are unhashable
# we wrap it in an Unordered object to stop heappush from crashing.
# Unordered simply orders by random order, which is fine
# since offsets are always unique.
heappush(buf, Attachment(message.offset, Unordered(fut)))
return fut
def put_fut(self, message: Message, future_message: FutureMessage,) -> None:
"""Attach message to source topic offset."""
buf = self._pending[message.tp]
heappush(buf, Attachment(message.offset, Unordered(future_message)))
async def commit(self, tp: TP, offset: int) -> None:
"""Publish all messaged attached to topic partition and offset."""
await asyncio.wait(
await self.publish_for_tp_offset(tp, offset),
return_when=asyncio.ALL_COMPLETED,
loop=self.app.loop,
)
async def publish_for_tp_offset(
self, tp: TP, offset: int
) -> List[Awaitable[RecordMetadata]]:
"""Publish messages attached to topic partition and offset."""
# publish pending messages attached to this TP+offset
# make shallow copy to allow concurrent modifications (append)
attached = list(self.attachments_for(tp, offset))
return [
await fut.message.channel.publish_message(fut, wait=False)
for fut in attached
]
def attachments_for(self, tp: TP, commit_offset: int) -> Iterator[FutureMessage]:
# Return attached messages for TopicPartition within committed offset.
attached = self._pending.get(tp)
while attached:
# get the entry with the smallest offset in this TP
entry = heappop(attached)
# if the entry offset is smaller or equal to the offset
# being committed
if entry[0] <= commit_offset:
# we use it by extracting the FutureMessage
# from Attachment tuple, where entry.message is
# Unordered[FutureMessage].
yield entry.message.value
else:
# else we put it back and exit (this was the smallest offset).
heappush(attached, entry)
break