Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
faust-streaming / faust / sensors / prometheus.py
Size: Mime:
"""Monitor using Prometheus."""
import time
import typing
from typing import Any, NamedTuple, cast

from aiohttp.web import Response
from prometheus_client.utils import INF

from faust import web, web as _web
from faust.exceptions import ImproperlyConfigured
from faust.types import (
    TP,
    AppT,
    CollectionT,
    EventT,
    Message,
    PendingMessage,
    RecordMetadata,
    StreamT,
)
from faust.types.assignor import PartitionAssignorT
from faust.types.transports import ConsumerT, ProducerT

from .monitor import Monitor, TPOffsetMapping

try:
    import prometheus_client
    from prometheus_client import (
        CONTENT_TYPE_LATEST,
        REGISTRY,
        CollectorRegistry,
        Counter,
        Gauge,
        Histogram,
        generate_latest,
    )
except ImportError:  # pragma: no cover
    prometheus_client = None


__all__ = ["setup_prometheus_sensors"]


def setup_prometheus_sensors(
    app: AppT,
    pattern: str = "/metrics",
    registry: CollectorRegistry = REGISTRY,
    name_prefix: str = None,
) -> None:
    if prometheus_client is None:
        raise ImproperlyConfigured(
            "prometheus_client requires `pip install prometheus_client`."
        )

    faust_metrics = FaustMetrics.create(registry, name_prefix or app.conf.name)
    app.monitor = PrometheusMonitor(metrics=faust_metrics)

    @app.page(pattern)
    async def metrics_handler(self: _web.View, request: _web.Request) -> _web.Response:
        headers = {"Content-Type": CONTENT_TYPE_LATEST}

        return cast(
            _web.Response,
            Response(body=generate_latest(REGISTRY), headers=headers, status=200),
        )


BUCKETS_S = (
    0.001,
    0.005,
    0.010,
    0.025,
    0.050,
    0.075,
    0.100,
    0.250,
    0.500,
    0.750,
    1.0,
    2.5,
    5.0,
    7.5,
    10.0,
    20.0,
    30.0,
    600.0,
    INF,
)
BUCKETS_MS = (
    0.001,
    0.005,
    0.010,
    0.050,
    0.100,
    0.500,
    1.0,
    2.5,
    5.0,
    7.5,
    10.0,
    25.0,
    50.0,
    75.0,
    100.0,
    250.0,
    500.0,
    1000.0,
    INF,
)


class FaustMetrics(NamedTuple):
    messages_received: Counter
    active_messages: Gauge
    messages_received_per_topics: Counter
    messages_received_per_topics_partition: Gauge
    events_runtime_latency: Histogram
    events_receive_latency: Histogram
    events_end2end_latency: Histogram

    # On Event Stream in
    total_events: Counter
    total_active_events: Gauge
    total_events_per_stream: Counter

    # On table changes get/set/del keys
    table_operations: Counter

    # On message send
    topic_messages_sent: Counter
    total_sent_messages: Counter
    producer_send_latency: Histogram
    total_error_messages_sent: Counter
    producer_error_send_latency: Histogram

    # Assignment
    assignment_operations: Counter
    assign_latency: Histogram

    # Rebalances
    total_rebalances: Gauge
    total_rebalances_recovering: Gauge
    rebalance_done_consumer_latency: Histogram
    rebalance_done_latency: Histogram

    # Count Metrics by name
    count_metrics_by_name: Gauge

    # Web
    http_status_codes: Counter
    http_latency: Histogram

    # Topic/Partition Offsets
    topic_partition_end_offset: Gauge
    topic_partition_offset_commited: Gauge
    consumer_commit_latency: Histogram

    @classmethod
    def create(cls, registry: CollectorRegistry, app_name: str) -> "FaustMetrics":
        messages_received = Counter(
            f"{app_name}_messages_received",
            "Total messages received",
            registry=registry,
        )
        active_messages = Gauge(
            f"{app_name}_active_messages", "Total active messages", registry=registry
        )
        messages_received_per_topics = Counter(
            f"{app_name}_messages_received_per_topic",
            "Messages received per topic",
            ["topic"],
            registry=registry,
        )
        messages_received_per_topics_partition = Gauge(
            f"{app_name}_messages_received_per_topics_partition",
            "Messages received per topic/partition",
            ["topic", "partition"],
            registry=registry,
        )
        events_runtime_latency = Histogram(
            f"{app_name}_events_runtime_ms",
            "Events runtime in ms",
            registry=registry,
            buckets=BUCKETS_MS,
        )
        events_receive_latency = Histogram(
            f"{app_name}_events_receive_s",
            "Events receive lag in second",
            registry=registry,
            buckets=BUCKETS_S,
        )
        events_end2end_latency = Histogram(
            f"{app_name}_events_end2end_s",
            "Events end2end lag in second",
            registry=registry,
            buckets=BUCKETS_S,
        )
        total_events = Counter(
            f"{app_name}_total_events", "Total events received", registry=registry
        )
        total_active_events = Gauge(
            f"{app_name}_total_active_events", "Total active events", registry=registry
        )
        total_events_per_stream = Counter(
            f"{app_name}_total_events_per_stream",
            "Events received per Stream",
            ["stream"],
            registry=registry,
        )
        table_operations = Counter(
            f"{app_name}_table_operations",
            "Total table operations",
            ["table", "operation"],
            registry=registry,
        )
        topic_messages_sent = Counter(
            f"{app_name}_topic_messages_sent",
            "Total messages sent per topic",
            ["topic"],
            registry=registry,
        )
        total_sent_messages = Counter(
            f"{app_name}_total_sent_messages", "Total messages sent", registry=registry
        )
        producer_send_latency = Histogram(
            f"{app_name}_producer_send_latency",
            "Producer send latency in ms",
            registry=registry,
            buckets=BUCKETS_MS,
        )
        total_error_messages_sent = Counter(
            f"{app_name}_total_error_messages_sent",
            "Total error messages sent",
            registry=registry,
        )
        producer_error_send_latency = Histogram(
            f"{app_name}_producer_error_send_latency",
            "Producer error send latency in ms",
            registry=registry,
            buckets=BUCKETS_MS,
        )
        assignment_operations = Counter(
            f"{app_name}_assignment_operations",
            "Total assigment operations (completed/error)",
            ["operation"],
            registry=registry,
        )
        assign_latency = Histogram(
            f"{app_name}_assign_latency",
            "Assignment latency in ms",
            registry=registry,
            buckets=BUCKETS_MS,
        )
        total_rebalances = Gauge(
            f"{app_name}_total_rebalances", "Total rebalances", registry=registry
        )
        total_rebalances_recovering = Gauge(
            f"{app_name}_total_rebalances_recovering",
            "Total rebalances recovering",
            registry=registry,
        )
        rebalance_done_consumer_latency = Histogram(
            f"{app_name}_rebalance_done_consumer_latency",
            "Consumer replying that rebalance is done to broker in ms",
            registry=registry,
            buckets=BUCKETS_MS,
        )
        rebalance_done_latency = Histogram(
            f"{app_name}_rebalance_done_latency",
            "Rebalance finished latency in ms",
            registry=registry,
            buckets=BUCKETS_MS,
        )
        count_metrics_by_name = Gauge(
            f"{app_name}_metrics_by_name",
            "Total metrics by name",
            ["metric"],
            registry=registry,
        )
        http_status_codes = Counter(
            f"{app_name}_http_status_codes",
            "Total http_status code",
            ["status_code"],
            registry=registry,
        )
        http_latency = Histogram(
            f"{app_name}_http_latency",
            "Http response latency in ms",
            registry=registry,
            buckets=BUCKETS_MS,
        )
        topic_partition_end_offset = Gauge(
            f"{app_name}_topic_partition_end_offset",
            "Offset ends per topic/partition",
            ["topic", "partition"],
            registry=registry,
        )
        topic_partition_offset_commited = Gauge(
            f"{app_name}_topic_partition_offset_commited",
            "Offset commited per topic/partition",
            ["topic", "partition"],
            registry=registry,
        )
        consumer_commit_latency = Histogram(
            f"{app_name}_consumer_commit_latency",
            "Consumer commit latency in ms",
            registry=registry,
            buckets=BUCKETS_MS,
        )
        return cls(
            messages_received=messages_received,
            active_messages=active_messages,
            messages_received_per_topics=messages_received_per_topics,
            messages_received_per_topics_partition=(
                messages_received_per_topics_partition
            ),
            events_runtime_latency=events_runtime_latency,
            events_receive_latency=events_receive_latency,
            events_end2end_latency=events_end2end_latency,
            total_events=total_events,
            total_active_events=total_active_events,
            total_events_per_stream=total_events_per_stream,
            table_operations=table_operations,
            topic_messages_sent=topic_messages_sent,
            total_sent_messages=total_sent_messages,
            producer_send_latency=producer_send_latency,
            total_error_messages_sent=total_error_messages_sent,
            producer_error_send_latency=producer_error_send_latency,
            assignment_operations=assignment_operations,
            assign_latency=assign_latency,
            total_rebalances=total_rebalances,
            total_rebalances_recovering=total_rebalances_recovering,
            rebalance_done_consumer_latency=rebalance_done_consumer_latency,
            rebalance_done_latency=rebalance_done_latency,
            count_metrics_by_name=count_metrics_by_name,
            http_status_codes=http_status_codes,
            http_latency=http_latency,
            topic_partition_end_offset=topic_partition_end_offset,
            topic_partition_offset_commited=topic_partition_offset_commited,
            consumer_commit_latency=consumer_commit_latency,
        )

    def clear_topic_related_metrics(self) -> None:
        self._clear_topic_partition_related_metrics()
        self._clear_topic_related_metrics()

    def _clear_topic_partition_related_metrics(self) -> None:
        metrics = [
            self.messages_received_per_topics_partition,
            self.topic_partition_end_offset,
            self.topic_partition_offset_commited,
        ]
        for metric in metrics:
            topics_partitions = frozenset(
                (sample.labels["topic"], sample.labels["partition"])
                for sample in metric.collect()[0].samples
            )
            for topic, partition in topics_partitions:
                metric.remove(topic, partition)

    def _clear_topic_related_metrics(self) -> None:
        metrics = [
            self.messages_received_per_topics,
            self.topic_messages_sent,
        ]
        for metric in metrics:
            topics = frozenset(
                sample.labels["topic"] for sample in metric.collect()[0].samples
            )
            for topic in topics:
                metric.remove(topic)


class PrometheusMonitor(Monitor):
    """
    Prometheus Faust Sensor.

    This sensor, records statistics using prometheus_client and expose
    them using the aiohttp server running under /metrics by default

    Usage:
        import faust
        from faust.sensors.prometheus import setup_prometheus_sensors

        app = faust.App('example', broker='kafka://')
        setup_prometheus_sensors(app, pattern='/metrics', 'example_app_name')
    """

    ERROR = "error"
    COMPLETED = "completed"
    KEYS_RETRIEVED = "keys_retrieved"
    KEYS_UPDATED = "keys_updated"
    KEYS_DELETED = "keys_deleted"

    def __init__(self, metrics: FaustMetrics, **kwargs: Any) -> None:
        super().__init__(**kwargs)
        self._metrics = metrics

    def on_message_in(self, tp: TP, offset: int, message: Message) -> None:
        """Call before message is delegated to streams."""
        super().on_message_in(tp, offset, message)

        self._metrics.messages_received.inc()
        self._metrics.active_messages.inc()
        self._metrics.messages_received_per_topics.labels(topic=tp.topic).inc()
        self._metrics.messages_received_per_topics_partition.labels(
            topic=tp.topic, partition=tp.partition
        ).set(offset)

    def on_stream_event_in(
        self, tp: TP, offset: int, stream: StreamT, event: EventT
    ) -> typing.Optional[typing.Dict]:
        """Call when stream starts processing an event."""
        state = super().on_stream_event_in(tp, offset, stream, event)
        self._metrics.total_events.inc()
        self._metrics.total_active_events.inc()
        self._metrics.total_events_per_stream.labels(
            stream=f"stream.{self._stream_label(stream)}.events"
        ).inc()

        now = time.time()
        self._metrics.events_receive_latency.observe(now - event.message.timestamp)

        headers = event.message.headers
        if headers:
            if isinstance(headers, dict):
                event_orig_time = headers.get("originating-time")
            else:
                event_orig_time = next(
                    (v for k, v in headers if k == "originating-time"), None
                )

            if event_orig_time:
                self._metrics.events_end2end_latency.observe(
                    now - float(event_orig_time.decode("ascii"))
                )

        return state

    def _stream_label(self, stream: StreamT) -> str:
        return (
            self._normalize(
                stream.shortlabel.lstrip("Stream:"),
            )
            .strip("_")
            .lower()
        )

    def on_stream_event_out(
        self,
        tp: TP,
        offset: int,
        stream: StreamT,
        event: EventT,
        state: typing.Dict = None,
    ) -> None:
        """Call when stream is done processing an event."""
        super().on_stream_event_out(tp, offset, stream, event, state)
        self._metrics.total_active_events.dec()
        if state is not None:
            self._metrics.events_runtime_latency.observe(
                self.secs_to_ms(self.events_runtime[-1])
            )

    def on_message_out(self, tp: TP, offset: int, message: Message) -> None:
        """Call when message is fully acknowledged and can be committed."""
        super().on_message_out(tp, offset, message)
        self._metrics.active_messages.dec()

    def on_table_get(self, table: CollectionT, key: typing.Any) -> None:
        """Call when value in table is retrieved."""
        super().on_table_get(table, key)
        self._metrics.table_operations.labels(
            table=f"table.{table.name}", operation=self.KEYS_RETRIEVED
        ).inc()

    def on_table_set(
        self, table: CollectionT, key: typing.Any, value: typing.Any
    ) -> None:
        """Call when new value for key in table is set."""
        super().on_table_set(table, key, value)
        self._metrics.table_operations.labels(
            table=f"table.{table.name}", operation=self.KEYS_UPDATED
        ).inc()

    def on_table_del(self, table: CollectionT, key: typing.Any) -> None:
        """Call when key in a table is deleted."""
        super().on_table_del(table, key)
        self._metrics.table_operations.labels(
            table=f"table.{table.name}", operation=self.KEYS_DELETED
        ).inc()

    def on_commit_completed(self, consumer: ConsumerT, state: typing.Any) -> None:
        """Call when consumer commit offset operation completed."""
        super().on_commit_completed(consumer, state)
        self._metrics.consumer_commit_latency.observe(
            self.ms_since(typing.cast(float, state))
        )

    def on_send_initiated(
        self,
        producer: ProducerT,
        topic: str,
        message: PendingMessage,
        keysize: int,
        valsize: int,
    ) -> typing.Any:
        """Call when message added to producer buffer."""
        self._metrics.topic_messages_sent.labels(topic=f"topic.{topic}").inc()

        return super().on_send_initiated(producer, topic, message, keysize, valsize)

    def on_send_completed(
        self, producer: ProducerT, state: typing.Any, metadata: RecordMetadata
    ) -> None:
        """Call when producer finished sending message."""
        super().on_send_completed(producer, state, metadata)
        self._metrics.total_sent_messages.inc()
        self._metrics.producer_send_latency.observe(
            self.ms_since(typing.cast(float, state))
        )

    def on_send_error(
        self, producer: ProducerT, exc: BaseException, state: typing.Any
    ) -> None:
        """Call when producer was unable to publish message."""
        super().on_send_error(producer, exc, state)
        self._metrics.total_error_messages_sent.inc()
        self._metrics.producer_error_send_latency.observe(
            self.ms_since(typing.cast(float, state))
        )

    def on_assignment_error(
        self, assignor: PartitionAssignorT, state: typing.Dict, exc: BaseException
    ) -> None:
        """Partition assignor did not complete assignor due to error."""
        super().on_assignment_error(assignor, state, exc)
        self._metrics.assignment_operations.labels(operation=self.ERROR).inc()
        self._metrics.assign_latency.observe(self.ms_since(state["time_start"]))

    def on_assignment_completed(
        self, assignor: PartitionAssignorT, state: typing.Dict
    ) -> None:
        """Partition assignor completed assignment."""
        super().on_assignment_completed(assignor, state)
        self._metrics.assignment_operations.labels(operation=self.COMPLETED).inc()
        self._metrics.assign_latency.observe(self.ms_since(state["time_start"]))

    def on_rebalance_start(self, app: AppT) -> typing.Dict:
        """Cluster rebalance in progress."""
        state = super().on_rebalance_start(app)
        self._clear_partition_related_metrics()
        self._metrics.total_rebalances.inc()

        return state

    def on_rebalance_return(self, app: AppT, state: typing.Dict) -> None:
        """Consumer replied assignment is done to broker."""
        super().on_rebalance_return(app, state)
        self._metrics.total_rebalances.dec()
        self._metrics.total_rebalances_recovering.inc()
        self._metrics.rebalance_done_consumer_latency.observe(
            self.ms_since(state["time_return"])
        )

    def on_rebalance_end(self, app: AppT, state: typing.Dict) -> None:
        """Cluster rebalance fully completed (including recovery)."""
        super().on_rebalance_end(app, state)
        self._clear_partition_related_metrics()
        self._metrics.total_rebalances_recovering.dec()
        self._metrics.rebalance_done_latency.observe(self.ms_since(state["time_end"]))

    def count(self, metric_name: str, count: int = 1) -> None:
        """Count metric by name."""
        super().count(metric_name, count=count)
        self._metrics.count_metrics_by_name.labels(metric=metric_name).inc(count)

    def on_tp_commit(self, tp_offsets: TPOffsetMapping) -> None:
        """Call when offset in topic partition is committed."""
        super().on_tp_commit(tp_offsets)
        for tp, offset in tp_offsets.items():
            self._metrics.topic_partition_offset_commited.labels(
                topic=tp.topic, partition=tp.partition
            ).set(offset)

    def track_tp_end_offset(self, tp: TP, offset: int) -> None:
        """Track new topic partition end offset for monitoring lags."""
        super().track_tp_end_offset(tp, offset)
        self._metrics.topic_partition_end_offset.labels(
            topic=tp.topic, partition=tp.partition
        ).set(offset)

    def on_web_request_end(
        self,
        app: AppT,
        request: web.Request,
        response: typing.Optional[web.Response],
        state: typing.Dict,
        *,
        view: web.View = None,
    ) -> None:
        """Web server finished working on request."""
        super().on_web_request_end(app, request, response, state, view=view)
        status_code = int(state["status_code"])
        self._metrics.http_status_codes.labels(status_code=status_code).inc()
        self._metrics.http_latency.observe(self.ms_since(state["time_end"]))

    def _clear_partition_related_metrics(self) -> None:
        self._metrics.clear_topic_related_metrics()
        self.tp_committed_offsets.clear()
        self.tp_read_offsets.clear()
        self.tp_end_offsets.clear()