Consumers

This library is aimed at making implementing a message consumer as simple as possible by implementing common boilerplate code and offering a command line interface to easily start a consumer as a service under init systems like systemd.

Introduction

AMQP consumers configure a queue for their use in the message broker. When a message is published to an exchange and matches the bindings the consumer has declared, the message is placed in the queue and eventually delivered to the consumer. Fedora uses a topic exchange for general-purpose messages.

Fortunately, you don’t need to manage the connection to the broker or configure the queue. All you need to do is to implement some code to run when a message is received. The API expects a callable object that accepts a single positional argument:

from fedora_messaging import api, config

# The fedora_messaging API does not automatically configure logging so as
# to not destroy application logging setup. This is a convenience method
# to configure the Python logger with the fedora-messaging logging config.
config.conf.setup_logging()

# First, define a function to be used as our callback. This will be called
# whenever a message is received from the server.
def printer_callback(message):
    """
    Print the message to standard output.

    Args:
        message (fedora_messaging.message.Message): The message we received
            from the queue.
    """
    print(str(message))

# Next, we need a queue to consume messages from. We can define
# the queue and binding configurations in these dictionaries:
queues = {
    'demo': {
        'durable': False,  # Delete the queue on broker restart
        'auto_delete': True,  # Delete the queue when the client terminates
        'exclusive': False,  # Allow multiple simultaneous consumers
        'arguments': {},
    },
}
binding = {
    'exchange': 'amq.topic',  # The AMQP exchange to bind our queue to
    'queue': 'demo',  # The unique name of our queue on the AMQP broker
    'routing_keys': ['#'],  # The topics that should be delivered to the queue
}

# Start consuming messages using our callback. This call will block until
# a KeyboardInterrupt is raised, or the process receives a SIGINT or SIGTERM
# signal.
api.consume(printer_callback, bindings=binding, queues=queues)

In this example, there’s one queue and the queue only has one binding, but it’s possible to consume from multiple queues and each queue can have multiple bindings.

Command Line Interface

A command line interface, fedora-messaging, is included to make running consumers easier. It’s not necessary to write any boilerplate code calling the API, just run fedora-messaging consume and provide it the Python path to your callback:

$ fedora-messaging consume --callback=fedora_messaging.example:printer

Consult the manual page for complete details on this command line interface.

Note

For users of fedmsg, this is roughly equivalent to fedmsg-hub

Consumer API

The introduction contains a very minimal callback. This section covers the complete API for consumers.

The Callback

The callback provided to fedora_messaging.api.consume() or the command-line interface can be any callable Python object, so long as it accepts the message object as a single positional argument.

The API will also accept a Python class, which it will instantiate before using as a callable object. This allows you to write a callback with easy one-time initialization or a callback that maintains state between calls:

import os

from fedora_messaging import config


class SaveMessage:
    """
    A fedora-messaging consumer that saves the message to a file.

    A single configuration key is used from fedora-messaging's
    "consumer_config" key, "path", which is where the consumer will save
    the messages::

        [consumer_config]
        path = "/tmp/fedora-messaging/messages.txt"
    """

    def __init__(self):
        """Perform some one-time initialization for the consumer."""
        self.path = config.conf["consumer_config"]["path"]

        # Ensure the path exists before the consumer starts
        if not os.path.exists(os.path.dirname(self.path)):
            os.mkdir(os.path.dirname(self.path))

    def __call__(self, message):
        """
        Invoked when a message is received by the consumer.

        Args:
            message (fedora_messaging.api.Message): The message from AMQP.
        """
        with open(self.path, "a") as fd:
            fd.write(str(message))

When running this type of callback from the command-line interface, specify the Python path to the class object, not the __call__ method:

$ fedora-messaging consume --callback=package_name.module:SaveMessage

Exceptions

  • Consumers should raise the fedora_messaging.exceptions.Nack exception if the consumer cannot handle the message at this time. The message will be re-queued, and the server will attempt to re-deliver it at a later time.

  • Consumers should raise the fedora_messaging.exceptions.Drop exception when they wish to explicitly indicate they do not want handle the message. This is similar to simply calling return, but the server is informed the client dropped the message. What the server does depends on configuration.

  • Consumers should raise the fedora_messaging.exceptions.HaltConsumer exception if they wish to stop consuming messages.

If a consumer raises any other exception, a traceback will be logged at the error level, the message being processed and any pre-fetched messages will be returned to the queue for later delivery, and the consumer will be canceled.

If the CLI is being used, it will halt with a non-zero exit code. If the API is being used directly, consult the API documentation for exact results, as the synchronous and asynchronous APIs communicate failures differently.

Synchronous and Asynchronous Calls

The AMQP consumer runs in a Twisted event loop. When a message arrives, it calls the callback in a separate Python thread to avoid blocking vital operations like the connection heartbeat. The callback is free to use any blocking (synchronous) calls it likes.

Note

Your callback does not need to be thread-safe. By default, messages are processed serially.

It is safe to start threads to perform IO-blocking work concurrently. If you wish to make use of a Twisted API, you must use the twisted.internet.threads.blockingCallFromThread() or twisted.internet.interfaces.IReactorFromThreads APIs.

You may also use asyncio-based asynchronous callbacks, either via an async function or via an object that has an async __call__() method. In this case, the callback will not be run in a separate thread, it will instead be scheduled as a regular asyncio task.

Consumer Configuration

A special section of the fedora-messaging configuration will be available for consumers to use if they need configuration options. Refer to the consumer_config in the Configuration documentation for details.

systemd Service

A systemd service file is also included in the Python package for your convenience. It is called fm-consumer@.service and simply runs fedora-messaging consume with a configuration file from /etc/fedora-messaging/ that matches the service name:

$ systemctl start fm-consumer@sample.service  # uses /etc/fedora-messaging/sample.toml