Consumers¶
This library is aimed at making implementing a message consumer as simple as possible by implementing common boilerplate code and offering a command line interface to easily start a consumer as a service under init systems like systemd.
Introduction¶
AMQP consumers configure a queue for their use in the message broker. When a message is published to an exchange and matches the bindings the consumer has declared, the message is placed in the queue and eventually delivered to the consumer. Fedora uses a topic exchange for general-purpose messages.
Fortunately, you don’t need to manage the connection to the broker or configure the queue. All you need to do is to implement some code to run when a message is received. The API expects a callable object that accepts a single positional argument:
from fedora_messaging import api, config
# The fedora_messaging API does not automatically configure logging so as
# to not destroy application logging setup. This is a convenience method
# to configure the Python logger with the fedora-messaging logging config.
config.conf.setup_logging()
# First, define a function to be used as our callback. This will be called
# whenever a message is received from the server.
def printer_callback(message):
"""
Print the message to standard output.
Args:
message (fedora_messaging.message.Message): The message we received
from the queue.
"""
print(str(message))
# Next, we need a queue to consume messages from. We can define
# the queue and binding configurations in these dictionaries:
queues = {
'demo': {
'durable': False, # Delete the queue on broker restart
'auto_delete': True, # Delete the queue when the client terminates
'exclusive': False, # Allow multiple simultaneous consumers
'arguments': {},
},
}
binding = {
'exchange': 'amq.topic', # The AMQP exchange to bind our queue to
'queue': 'demo', # The unique name of our queue on the AMQP broker
'routing_keys': ['#'], # The topics that should be delivered to the queue
}
# Start consuming messages using our callback. This call will block until
# a KeyboardInterrupt is raised, or the process receives a SIGINT or SIGTERM
# signal.
api.consume(printer_callback, bindings=binding, queues=queues)
In this example, there’s one queue and the queue only has one binding, but it’s possible to consume from multiple queues and each queue can have multiple bindings.
Command Line Interface¶
A command line interface, fedora-messaging, is included to make running
consumers easier. It’s not necessary to write any boilerplate code calling the
API, just run fedora-messaging consume
and provide it the Python path to
your callback:
$ fedora-messaging consume --callback=fedora_messaging.example:printer
Consult the manual page for complete details on this command line interface.
Note
For users of fedmsg, this is roughly equivalent to fedmsg-hub
Consumer API¶
The introduction contains a very minimal callback. This section covers the complete API for consumers.
The Callback¶
The callback provided to fedora_messaging.api.consume()
or the command-line
interface can be any callable Python object, so long as it accepts the message
object as a single positional argument.
The API will also accept a Python class, which it will instantiate before using as a callable object. This allows you to write a callback with easy one-time initialization or a callback that maintains state between calls:
import os
from fedora_messaging import config
class SaveMessage:
"""
A fedora-messaging consumer that saves the message to a file.
A single configuration key is used from fedora-messaging's
"consumer_config" key, "path", which is where the consumer will save
the messages::
[consumer_config]
path = "/tmp/fedora-messaging/messages.txt"
"""
def __init__(self):
"""Perform some one-time initialization for the consumer."""
self.path = config.conf["consumer_config"]["path"]
# Ensure the path exists before the consumer starts
if not os.path.exists(os.path.dirname(self.path)):
os.mkdir(os.path.dirname(self.path))
def __call__(self, message):
"""
Invoked when a message is received by the consumer.
Args:
message (fedora_messaging.api.Message): The message from AMQP.
"""
with open(self.path, "a") as fd:
fd.write(str(message))
When running this type of callback from the command-line interface, specify
the Python path to the class object, not the __call__
method:
$ fedora-messaging consume --callback=package_name.module:SaveMessage
Exceptions¶
Consumers should raise the
fedora_messaging.exceptions.Nack
exception if the consumer cannot handle the message at this time. The message will be re-queued, and the server will attempt to re-deliver it at a later time.Consumers should raise the
fedora_messaging.exceptions.Drop
exception when they wish to explicitly indicate they do not want handle the message. This is similar to simply callingreturn
, but the server is informed the client dropped the message. What the server does depends on configuration.Consumers should raise the
fedora_messaging.exceptions.HaltConsumer
exception if they wish to stop consuming messages.
If a consumer raises any other exception, a traceback will be logged at the error level, the message being processed and any pre-fetched messages will be returned to the queue for later delivery, and the consumer will be canceled.
If the CLI is being used, it will halt with a non-zero exit code. If the API is being used directly, consult the API documentation for exact results, as the synchronous and asynchronous APIs communicate failures differently.
Synchronous and Asynchronous Calls¶
The AMQP consumer runs in a Twisted event loop. When a message arrives, it calls the callback in a separate Python thread to avoid blocking vital operations like the connection heartbeat. The callback is free to use any blocking (synchronous) calls it likes.
Note
Your callback does not need to be thread-safe. By default, messages are processed serially.
It is safe to start threads to perform IO-blocking work concurrently. If you
wish to make use of a Twisted API, you must use the
twisted.internet.threads.blockingCallFromThread()
or
twisted.internet.interfaces.IReactorFromThreads
APIs.
You may also use asyncio-based asynchronous callbacks, either via an async
function or via an object that has an async __call__()
method. In this
case, the callback will not be run in a separate thread, it will instead be
scheduled as a regular asyncio task.
Consumer Configuration¶
A special section of the fedora-messaging configuration will be available for consumers to use if they need configuration options. Refer to the consumer_config in the Configuration documentation for details.
systemd Service¶
A systemd service file is also included in the Python package for your
convenience. It is called fm-consumer@.service
and simply runs
fedora-messaging consume
with a configuration file from
/etc/fedora-messaging/
that matches the service name:
$ systemctl start fm-consumer@sample.service # uses /etc/fedora-messaging/sample.toml