# SPDX-FileCopyrightText: 2024 Red Hat, Inc
#
# SPDX-License-Identifier: GPL-2.0-or-later
import asyncio
import logging
import uuid
from collections.abc import Coroutine, Generator
from typing import Any, Callable, cast, Optional, TYPE_CHECKING, Union
import pika
import pika.exceptions
from pika.adapters.twisted_connection import ClosableDeferredQueue, TwistedChannel
from twisted.internet import defer, error, reactor, threads
from twisted.python.failure import Failure
from .. import config
from ..exceptions import (
ConnectionException,
ConsumerCanceled,
Drop,
HaltConsumer,
Nack,
PermissionException,
ValidationError,
)
from ..message import get_message
from .stats import ConsumerStatistics
if TYPE_CHECKING:
import pika.frame
from typing_extensions import TypeAlias, TypeGuard
from .protocol import FedoraMessagingProtocolV2
_std_log = logging.getLogger(__name__)
QueueContent: "TypeAlias" = tuple[
TwistedChannel, pika.spec.Basic.Deliver, pika.BasicProperties, bytes
]
def is_coro(
func_or_obj: config.CallbackType,
) -> "TypeGuard[Callable[..., Coroutine[Any, Any, Any]]]":
"""Tests if a function is a coroutine function or a callable coroutine object."""
# Until Python 3.10, inspect.iscoroutinefunction() will fail to identify AsyncMocks
# as coroutines: https://github.com/python/cpython/issues/84753
# Use asyncio.iscoroutinefunction() instead.
return asyncio.iscoroutinefunction(func_or_obj) or (
callable(func_or_obj) and asyncio.iscoroutinefunction(func_or_obj.__call__) # type: ignore
)
[docs]
class Consumer:
"""
Represents a Twisted AMQP consumer and is returned from the call to
:func:`fedora_messaging.api.twisted_consume`.
Attributes:
queue (str): The AMQP queue this consumer is subscribed to.
callback (callable): The callback to run when a message arrives.
result (twisted.internet.defer.Deferred[Consumer]):
A deferred that runs the callbacks if the consumer exits gracefully
after being canceled by a call to :meth:`Consumer.cancel` and
errbacks if the consumer stops for any other reason. The reasons a
consumer could stop are: a :class:`.PermissionException` if the
consumer does not have permissions to read from the queue it is
subscribed to, a :class:`.HaltConsumer` is raised by the consumer
indicating it wishes to halt, an unexpected :class:`Exception` is
raised by the consumer, or if the consumer is canceled by the
server which happens if the queue is deleted by an administrator or
if the node the queue lives on fails.
"""
def __init__(self, queue: Optional[str] = None, callback: Optional[config.CallbackType] = None):
self.queue = queue
self.callback = callback
self.result: defer.Deferred[Consumer] = defer.Deferred()
# The current channel used by this consumer.
self._channel: Optional[TwistedChannel] = None
# The unique ID for the AMQP consumer.
self._tag = str(uuid.uuid4())
# Used in the consumer read loop to know when it's being canceled.
self._running = False
# The current read loop
self._read_loop = defer.succeed(None)
# The protocol that currently runs this consumer, used when cancel is
# called to remove itself from the protocol and its factory so it doesn't
# restart on the next connection.
self._protocol: Union[FedoraMessagingProtocolV2, None] = None
# Message statistics
self.stats = ConsumerStatistics()
def __repr__(self) -> str:
return f"Consumer(queue={self.queue}, callback={self.callback})"
@property
def running(self) -> bool:
"""Whether the consumer is running."""
return self._running
@defer.inlineCallbacks
def consume(self) -> Generator[defer.Deferred[Any], Any]:
if self._channel is None:
raise RuntimeError("No channel, open one first")
if self.queue is None:
raise RuntimeError("No queue name defined")
yield self._channel.basic_qos(
prefetch_count=config.conf["qos"]["prefetch_count"],
prefetch_size=config.conf["qos"]["prefetch_size"],
)
try:
queue_object: ClosableDeferredQueue
queue_object, _ = yield self._channel.basic_consume(
queue=self.queue, consumer_tag=self._tag
)
except pika.exceptions.ChannelClosed as exc:
if exc.args[0] == 403:
raise PermissionException(
obj_type="queue", description=self.queue, reason=exc.args[1]
) from exc
else:
raise ConnectionException(reason=exc) from exc
self._running = True
self._read_loop = self._read(queue_object)
self._read_loop.addErrback(self._read_loop_errback)
@defer.inlineCallbacks
def _read(self, queue_object: ClosableDeferredQueue) -> Generator[defer.Deferred[None]]:
"""
The loop that reads from the message queue and calls the callback
wrapper.
Serialized Processing
---------------------
This loop processes messages serially. This is because a second
``queue_object.get()`` operation can only occur after the Deferred from
the callback completes. Thus, we can be sure that callbacks
never run concurrently in two different threads.
This is done rather than saturating the Twisted thread pool as the
documentation for callbacks (in fedmsg and here) has never indicated
that they are not thread-safe. In the future we can add a flag for users
who are confident in their ability to write thread-safe code.
Gracefully Halting
------------------
This is a loop that only exits when the ``self._running`` variable is
set to False. The call to cancel will set this to false, and will then
wait for the Deferred from this function to call back in order to
ensure the message finishes processing.
The Deferred object only completes when this method returns, so we need
to periodically check the status of ``self._running``. That's why
there's a short timeout on the call to ``queue_object.get``.
Args:
queue_object: The AMQP queue the consumer is bound to.
"""
while self._running:
yield self._read_one(queue_object)
@defer.inlineCallbacks
def _read_one(self, queue_object: ClosableDeferredQueue) -> Generator[
defer.Deferred[Any],
QueueContent,
Any,
]:
try:
deferred_get = queue_object.get()
# Type ignored because of https://github.com/twisted/twisted/issues/9909
deferred_get.addTimeout(1, reactor) # type: ignore
channel, delivery_frame, properties, body = yield deferred_get
except (defer.TimeoutError, defer.CancelledError):
return
_std_log.debug(
"Message arrived with delivery tag %s for %r",
delivery_frame.delivery_tag,
self._tag,
)
try:
topic: str = delivery_frame.routing_key or ""
message = get_message(topic, properties, body)
message.queue = self.queue
except ValidationError:
_std_log.warning(
"Message id %s did not pass validation; ignoring message",
properties.message_id,
)
channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=False)
return
self.stats.received += 1
try:
_std_log.info(
"Consuming message from topic %s (message id %s)",
message.topic,
properties.message_id,
)
if self.callback is not None and is_coro(self.callback):
d = defer.Deferred.fromFuture(asyncio.ensure_future(self.callback(message)))
else:
d = threads.deferToThread(self.callback, message)
yield d
except Nack:
_std_log.warning("Returning message id %s to the queue", properties.message_id)
channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=True)
self.stats.rejected += 1
except Drop:
_std_log.warning("Consumer requested message id %s be dropped", properties.message_id)
channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=False)
self.stats.dropped += 1
except HaltConsumer as e:
_std_log.info("Consumer indicated it wishes consumption to halt, shutting down")
if e.requeue:
channel.basic_nack(delivery_tag=delivery_frame.delivery_tag, requeue=True)
self.stats.rejected += 1
else:
channel.basic_ack(delivery_tag=delivery_frame.delivery_tag or 0)
self.stats.processed += 1
raise e
except Exception as e:
_std_log.exception("Received unexpected exception from consumer %r", self)
channel.basic_nack(delivery_tag=0, multiple=True, requeue=True)
self.stats.failed += 1
raise e
else:
_std_log.info(
"Successfully consumed message from topic %s (message id %s)",
message.topic,
properties.message_id,
)
channel.basic_ack(delivery_tag=delivery_frame.delivery_tag or 0)
self.stats.processed += 1
def _on_cancel_callback(
self, frame: Optional["pika.frame.Method[pika.spec.Basic.Cancel]"]
) -> None:
"""
Called when the consumer is canceled server-side.
This can happen, for example, when the queue is deleted.
To handle this, we do the necessary book-keeping to remove the consumer
and then fire the errback on the consumer so the caller of
:func:`fedora_messaging.api.consume` can decide what to do.
Args:
frame: The cancel method from the server, unused here because we already know what
consumer is being canceled.
"""
_std_log.error("%r was canceled by the AMQP broker!", self)
if self._protocol is not None:
self._protocol._forget_consumer(self.queue)
self._running = False
self.result.errback(fail=ConsumerCanceled())
def _read_loop_errback(self, failure: Failure) -> None:
"""
Handle errors coming out of the read loop.
There are two basic categories of errors: ones where the ``consumer.result``
Deferred needs to be fired because the error is not recoverable, ones
where we can recover from by letting the connection restart, and ones
which are fatal for this consumer only (the queue was deleted by an
administrator).
Args:
failure: The exception raised by the read loop encapsulated in a Failure.
"""
if failure.check(pika.exceptions.ConsumerCancelled):
# Pika 1.0.0+ raises this exception. To support previous versions
# we register a callback (called below) ourselves with the channel.
self._on_cancel_callback(None)
elif failure.check(pika.exceptions.ChannelClosed):
failure.value = cast(pika.exceptions.ChannelClosed, failure.value)
if failure.value.args[0] == 403:
# This is a mis-configuration, the consumer can register itself,
# but it doesn't have permissions to read from the queue,
# so no amount of restarting will help.
e = PermissionException(
obj_type="queue",
description=self.queue,
reason=failure.value.args[1],
)
self.result.errback(Failure(e, PermissionException))
self.cancel()
else:
_std_log.exception(
"Consumer halted (%r) unexpectedly; the connection should restart.",
failure,
)
elif failure.check(error.ConnectionDone, error.ConnectionLost):
_std_log.warning(
"The connection to the broker was lost (%r), consumer halted; "
"the connection should restart and consuming will resume.",
failure.value,
)
elif failure.check(pika.exceptions.ChannelWrongStateError):
_std_log.warning(
"The channel was closed by the server, you may have to increase the "
"consumer_timeout configuration on RabbitMQ and/or decrease qos.prefetch_count "
"in the fedora-messaging configuration file. Consuming will resume, starting "
"with the last processed message again."
)
self._restart_consuming()
elif failure.check(pika.exceptions.AMQPError):
_std_log.exception(
"An unexpected AMQP error occurred; the connection should "
"restart, but please report this as a bug."
)
else:
self.result.errback(failure)
self.cancel()
@defer.inlineCallbacks
def _restart_consuming(self):
if self._protocol is None: # pragma: no cover
return
self._channel = yield self._protocol._allocate_channel()
yield self.consume()
[docs]
@defer.inlineCallbacks
def cancel(self) -> Generator[defer.Deferred[Any], Any]:
"""
Cancel the consumer and clean up resources associated with it.
Consumers that are canceled are allowed to finish processing any
messages before halting.
Returns:
A deferred that fires when the consumer has finished processing any message it was
in the middle of and has been successfully canceled.
"""
if self._protocol is None or self._channel is None:
return
# Remove it from protocol and factory so it doesn't restart later.
try:
self._protocol._forget_consumer(self.queue)
except AttributeError:
pass
# Signal to the _read loop it's time to stop and wait for it to finish
# with whatever message it might be working on, then wait for the deferred
# to fire which indicates it is done.
self._running = False
yield self._read_loop
try:
# TODO: bug in types-pika
yield cast(
defer.Deferred["pika.frame.Method[pika.spec.Basic.CancelOk]"],
self._channel.basic_cancel(consumer_tag=self._tag),
)
except pika.exceptions.AMQPChannelError:
# Consumers are tied to channels, so if this channel is dead the
# consumer should already be canceled (and we can't get to it anyway)
pass
try:
# TODO: bug in pika, TwistedChannel.close() should return a Deferred
# (TwistedChannel.on_closed)
yield defer.maybeDeferred(self._channel.close)
except pika.exceptions.AMQPChannelError:
pass
if not self.result.called:
self.result.callback(self)