Source code for fedora_messaging.twisted.consumer

# SPDX-FileCopyrightText: 2024 Red Hat, Inc
#
# SPDX-License-Identifier: GPL-2.0-or-later

import asyncio
import logging
import uuid
from collections.abc import Coroutine, Generator
from typing import Any, Callable, cast, Optional, TYPE_CHECKING, Union

import pika
import pika.exceptions
from pika.adapters.twisted_connection import ClosableDeferredQueue, TwistedChannel
from twisted.internet import defer, error, reactor, threads
from twisted.python.failure import Failure

from .. import config
from ..exceptions import (
    ConnectionException,
    ConsumerCanceled,
    Drop,
    HaltConsumer,
    Nack,
    PermissionException,
    ValidationError,
)
from ..message import get_message
from .stats import ConsumerStatistics


if TYPE_CHECKING:
    import pika.frame
    from typing_extensions import TypeAlias, TypeGuard

    from .protocol import FedoraMessagingProtocolV2

_std_log = logging.getLogger(__name__)

QueueContent: "TypeAlias" = tuple[
    TwistedChannel, pika.spec.Basic.Deliver, pika.BasicProperties, bytes
]


def is_coro(
    func_or_obj: config.CallbackType,
) -> "TypeGuard[Callable[..., Coroutine[Any, Any, Any]]]":
    """Tests if a function is a coroutine function or a callable coroutine object."""
    # Until Python 3.10, inspect.iscoroutinefunction() will fail to identify AsyncMocks
    # as coroutines: https://github.com/python/cpython/issues/84753
    # Use asyncio.iscoroutinefunction() instead.
    return asyncio.iscoroutinefunction(func_or_obj) or (
        callable(func_or_obj) and asyncio.iscoroutinefunction(func_or_obj.__call__)  # type: ignore
    )


[docs] class Consumer: """ Represents a Twisted AMQP consumer and is returned from the call to :func:`fedora_messaging.api.twisted_consume`. Attributes: queue (str): The AMQP queue this consumer is subscribed to. callback (callable): The callback to run when a message arrives. result (twisted.internet.defer.Deferred[Consumer]): A deferred that runs the callbacks if the consumer exits gracefully after being canceled by a call to :meth:`Consumer.cancel` and errbacks if the consumer stops for any other reason. The reasons a consumer could stop are: a :class:`.PermissionException` if the consumer does not have permissions to read from the queue it is subscribed to, a :class:`.HaltConsumer` is raised by the consumer indicating it wishes to halt, an unexpected :class:`Exception` is raised by the consumer, or if the consumer is canceled by the server which happens if the queue is deleted by an administrator or if the node the queue lives on fails. """ def __init__(self, queue: Optional[str] = None, callback: Optional[config.CallbackType] = None): self.queue = queue self.callback = callback self.result: defer.Deferred[Consumer] = defer.Deferred() # The current channel used by this consumer. self._channel: Optional[TwistedChannel] = None # The unique ID for the AMQP consumer. self._tag = str(uuid.uuid4()) # Used in the consumer read loop to know when it's being canceled. self._running = False # The current read loop self._read_loop = defer.succeed(None) # The protocol that currently runs this consumer, used when cancel is # called to remove itself from the protocol and its factory so it doesn't # restart on the next connection. self._protocol: Union[FedoraMessagingProtocolV2, None] = None # Message statistics self.stats = ConsumerStatistics() def __repr__(self) -> str: return f"Consumer(queue={self.queue}, callback={self.callback})" @property def running(self) -> bool: """Whether the consumer is running.""" return self._running @defer.inlineCallbacks def consume(self) -> Generator[defer.Deferred[Any], Any]: if self._channel is None: raise RuntimeError("No channel, open one first") if self.queue is None: raise RuntimeError("No queue name defined") yield self._channel.basic_qos( prefetch_count=config.conf["qos"]["prefetch_count"], prefetch_size=config.conf["qos"]["prefetch_size"], ) try: queue_object: ClosableDeferredQueue queue_object, _ = yield self._channel.basic_consume( queue=self.queue, consumer_tag=self._tag ) except pika.exceptions.ChannelClosed as exc: if exc.args[0] == 403: raise PermissionException( obj_type="queue", description=self.queue, reason=exc.args[1] ) from exc else: raise ConnectionException(reason=exc) from exc self._running = True self._read_loop = self._read(queue_object) self._read_loop.addErrback(self._read_loop_errback) @defer.inlineCallbacks def _read(self, queue_object: ClosableDeferredQueue) -> Generator[defer.Deferred[None]]: """ The loop that reads from the message queue and calls the callback wrapper. Serialized Processing --------------------- This loop processes messages serially. This is because a second ``queue_object.get()`` operation can only occur after the Deferred from the callback completes. Thus, we can be sure that callbacks never run concurrently in two different threads. This is done rather than saturating the Twisted thread pool as the documentation for callbacks (in fedmsg and here) has never indicated that they are not thread-safe. In the future we can add a flag for users who are confident in their ability to write thread-safe code. Gracefully Halting ------------------ This is a loop that only exits when the ``self._running`` variable is set to False. The call to cancel will set this to false, and will then wait for the Deferred from this function to call back in order to ensure the message finishes processing. The Deferred object only completes when this method returns, so we need to periodically check the status of ``self._running``. That's why there's a short timeout on the call to ``queue_object.get``. Args: queue_object: The AMQP queue the consumer is bound to. """ while self._running: yield self._read_one(queue_object) @defer.inlineCallbacks def _read_one(self, queue_object: ClosableDeferredQueue) -> Generator[ defer.Deferred[Any], QueueContent, Any, ]: try: deferred_get = queue_object.get() # Type ignored because of https://github.com/twisted/twisted/issues/9909 deferred_get.addTimeout(1, reactor) # type: ignore channel, delivery_frame, properties, body = yield deferred_get except (defer.TimeoutError, defer.CancelledError): return _std_log.debug( "Message arrived with delivery tag %s for %r", delivery_frame.delivery_tag, self._tag, ) try: topic: str = delivery_frame.routing_key or "" message = get_message(topic, properties, body) message.queue = self.queue except ValidationError: _std_log.warning( "Message id %s did not pass validation; ignoring message", properties.message_id, ) channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=False) return self.stats.received += 1 try: _std_log.info( "Consuming message from topic %s (message id %s)", message.topic, properties.message_id, ) if self.callback is not None and is_coro(self.callback): d = defer.Deferred.fromFuture(asyncio.ensure_future(self.callback(message))) else: d = threads.deferToThread(self.callback, message) yield d except Nack: _std_log.warning("Returning message id %s to the queue", properties.message_id) channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=True) self.stats.rejected += 1 except Drop: _std_log.warning("Consumer requested message id %s be dropped", properties.message_id) channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=False) self.stats.dropped += 1 except HaltConsumer as e: _std_log.info("Consumer indicated it wishes consumption to halt, shutting down") if e.requeue: channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=True) self.stats.rejected += 1 else: channel.basic_ack(delivery_tag=delivery_frame.delivery_tag or 0) self.stats.processed += 1 raise e except Exception as e: _std_log.exception("Received unexpected exception from consumer %r", self) channel.basic_nack(delivery_tag=0, multiple=True, requeue=True) self.stats.failed += 1 raise e else: _std_log.info( "Successfully consumed message from topic %s (message id %s)", message.topic, properties.message_id, ) channel.basic_ack(delivery_tag=delivery_frame.delivery_tag or 0) self.stats.processed += 1 def _on_cancel_callback( self, frame: Optional["pika.frame.Method[pika.spec.Basic.Cancel]"] ) -> None: """ Called when the consumer is canceled server-side. This can happen, for example, when the queue is deleted. To handle this, we do the necessary book-keeping to remove the consumer and then fire the errback on the consumer so the caller of :func:`fedora_messaging.api.consume` can decide what to do. Args: frame: The cancel method from the server, unused here because we already know what consumer is being canceled. """ _std_log.error("%r was canceled by the AMQP broker!", self) if self._protocol is not None: self._protocol._forget_consumer(self.queue) self._running = False self.result.errback(fail=ConsumerCanceled()) def _read_loop_errback(self, failure: Failure) -> None: """ Handle errors coming out of the read loop. There are two basic categories of errors: ones where the ``consumer.result`` Deferred needs to be fired because the error is not recoverable, ones where we can recover from by letting the connection restart, and ones which are fatal for this consumer only (the queue was deleted by an administrator). Args: failure: The exception raised by the read loop encapsulated in a Failure. """ if failure.check(pika.exceptions.ConsumerCancelled): # Pika 1.0.0+ raises this exception. To support previous versions # we register a callback (called below) ourselves with the channel. self._on_cancel_callback(None) elif failure.check(pika.exceptions.ChannelClosed): failure.value = cast(pika.exceptions.ChannelClosed, failure.value) if failure.value.args[0] == 403: # This is a mis-configuration, the consumer can register itself, # but it doesn't have permissions to read from the queue, # so no amount of restarting will help. e = PermissionException( obj_type="queue", description=self.queue, reason=failure.value.args[1], ) self.result.errback(Failure(e, PermissionException)) self.cancel() else: _std_log.exception( "Consumer halted (%r) unexpectedly; the connection should restart.", failure, ) elif failure.check(error.ConnectionDone, error.ConnectionLost): _std_log.warning( "The connection to the broker was lost (%r), consumer halted; " "the connection should restart and consuming will resume.", failure.value, ) elif failure.check(pika.exceptions.ChannelWrongStateError): _std_log.warning( "The channel was closed by the server, you may have to increase the " "consumer_timeout configuration on RabbitMQ and/or decrease qos.prefetch_count " "in the fedora-messaging configuration file. Consuming will resume, starting " "with the last processed message again." ) self._restart_consuming() elif failure.check(pika.exceptions.AMQPError): _std_log.exception( "An unexpected AMQP error occurred; the connection should " "restart, but please report this as a bug." ) else: self.result.errback(failure) self.cancel() @defer.inlineCallbacks def _restart_consuming(self): if self._protocol is None: # pragma: no cover return self._channel = yield self._protocol._allocate_channel() yield self.consume()
[docs] @defer.inlineCallbacks def cancel(self) -> Generator[defer.Deferred[Any], Any]: """ Cancel the consumer and clean up resources associated with it. Consumers that are canceled are allowed to finish processing any messages before halting. Returns: A deferred that fires when the consumer has finished processing any message it was in the middle of and has been successfully canceled. """ if self._protocol is None or self._channel is None: return # Remove it from protocol and factory so it doesn't restart later. try: self._protocol._forget_consumer(self.queue) except AttributeError: pass # Signal to the _read loop it's time to stop and wait for it to finish # with whatever message it might be working on, then wait for the deferred # to fire which indicates it is done. self._running = False yield self._read_loop try: # TODO: bug in types-pika yield cast( defer.Deferred["pika.frame.Method[pika.spec.Basic.CancelOk]"], self._channel.basic_cancel(consumer_tag=self._tag), ) except pika.exceptions.AMQPChannelError: # Consumers are tied to channels, so if this channel is dead the # consumer should already be canceled (and we can't get to it anyway) pass try: # TODO: bug in pika, TwistedChannel.close() should return a Deferred # (TwistedChannel.on_closed) yield defer.maybeDeferred(self._channel.close) except pika.exceptions.AMQPChannelError: pass if not self.result.called: self.result.callback(self)