Developer Interface

This documentation covers the public interfaces fedora_messaging provides.

Note

Documented interfaces follow Semantic Versioning 2.0.0. Any interface not documented here may change at any time without warning.

Publishing

publish

fedora_messaging.api.publish(message, exchange=None, timeout=30)[source]

Publish a message to an exchange.

This is a synchronous call, meaning that when this function returns, an acknowledgment has been received from the message broker and you can be certain the message was published successfully.

There are some cases where an error occurs despite your message being successfully published. For example, if a network partition occurs after the message is received by the broker. Therefore, you may publish duplicate messages. For complete details, see the Publishing documentation.

>>> from fedora_messaging import api
>>> message = api.Message(body={'Hello': 'world'}, topic='Hi')
>>> api.publish(message)

If an attempt to publish fails because the broker rejects the message, it is not retried. Connection attempts to the broker can be configured using the “connection_attempts” and “retry_delay” options in the broker URL. See pika.connection.URLParameters for details.

Parameters:
  • message (message.Message) – The message to publish.

  • exchange (str) – The name of the AMQP exchange to publish to; defaults to publish_exchange

  • timeout (int) – The maximum time in seconds to wait before giving up attempting to publish the message. If the timeout is reached, a PublishTimeout exception is raised.

Raises:

Subscribing

twisted_consume

fedora_messaging.api.twisted_consume(callback, bindings=None, queues=None)[source]

Start a consumer using the provided callback and run it using the Twisted event loop (reactor).

Note

Callbacks run in a Twisted-managed thread pool using the twisted.internet.threads.deferToThread() API to avoid them blocking the event loop. If you wish to use Twisted APIs in your callback you must use the twisted.internet.threads.blockingCallFromThread() or twisted.internet.interfaces.IReactorFromThreads APIs.

This API expects the caller to start the reactor.

Parameters:
  • callback (callable) – A callable object that accepts one positional argument, a Message or a class object that implements the __call__ method. The class will be instantiated before use.

  • bindings (dict or list of dict) – Bindings to declare before consuming. This should be the same format as the bindings configuration.

  • queues (dict) – The queue to declare and consume from. Each key in this dictionary should be a queue name to declare, and each value should be a dictionary with the “durable”, “auto_delete”, “exclusive”, and “arguments” keys.

Raises:

ValueError – If the callback, bindings, or queues are invalid.

Returns:

A deferred that fires with the list of one or more Consumer objects. Each consumer object has a Consumer.result instance variable that is a Deferred that fires or errors when the consumer halts. Note that this API is meant to survive network problems, so consuming will continue until Consumer.cancel() is called or a fatal server error occurs. The deferred returned by this function may error back with a fedora_messaging.exceptions.BadDeclaration if queues or bindings cannot be declared on the broker, a fedora_messaging.exceptions.PermissionException if the user doesn’t have access to the queue, or fedora_messaging.exceptions.ConnectionException if the TLS or AMQP handshake fails.

Return type:

twisted.internet.defer.Deferred

Consumer

class fedora_messaging.api.Consumer(queue=None, callback=None)[source]

Represents a Twisted AMQP consumer and is returned from the call to fedora_messaging.api.twisted_consume().

queue

The AMQP queue this consumer is subscribed to.

Type:

str

callback

The callback to run when a message arrives.

Type:

callable

result

A deferred that runs the callbacks if the consumer exits gracefully after being canceled by a call to Consumer.cancel() and errbacks if the consumer stops for any other reason. The reasons a consumer could stop are: a fedora_messaging.exceptions.PermissionExecption if the consumer does not have permissions to read from the queue it is subscribed to, a HaltConsumer is raised by the consumer indicating it wishes to halt, an unexpected Exception is raised by the consumer, or if the consumer is canceled by the server which happens if the queue is deleted by an administrator or if the node the queue lives on fails.

Type:

twisted.internet.defer.Deferred

cancel()[source]

Cancel the consumer and clean up resources associated with it. Consumers that are canceled are allowed to finish processing any messages before halting.

Returns:

A deferred that fires when the consumer has finished processing any message it was in the middle of and has been successfully canceled.

Return type:

defer.Deferred

consume

fedora_messaging.api.consume(callback, bindings=None, queues=None)[source]

Start a message consumer that executes the provided callback when messages are received.

This API is blocking and will not return until the process receives a signal from the operating system.

Warning

This API is runs the callback in the IO loop thread. This means if your callback could run for a length of time near the heartbeat interval, which is likely on the order of 60 seconds, the broker will kill the TCP connection and the message will be re-delivered on start-up.

For now, use the twisted_consume() API which runs the callback in a thread and continues to handle AMQP events while the callback runs if you have a long-running callback.

The callback receives a single positional argument, the message:

>>> from fedora_messaging import api
>>> def my_callback(message):
...     print(message)
>>> bindings = [{'exchange': 'amq.topic', 'queue': 'demo', 'routing_keys': ['#']}]
>>> queues = {
...     "demo": {"durable": False, "auto_delete": True, "exclusive": True, "arguments": {}}
... }
>>> api.consume(my_callback, bindings=bindings, queues=queues)

If the bindings and queue arguments are not provided, they will be loaded from the configuration.

For complete documentation on writing consumers, see the Consumers documentation.

Parameters:
  • callback (callable) – A callable object that accepts one positional argument, a Message or a class object that implements the __call__ method. The class will be instantiated before use.

  • bindings (dict or list of dict) – Bindings to declare before consuming. This should be the same format as the bindings configuration.

  • queues (dict) – The queue or queues to declare and consume from. This should be in the same format as the queues configuration dictionary where each key is a queue name and each value is a dictionary of settings for that queue.

Raises:
  • fedora_messaging.exceptions.HaltConsumer – If the consumer requests that it be stopped.

  • ValueError – If the consumer provides a callback that is not a class that implements __call__ and is not a function, if the bindings argument is not a dict or list of dicts with the proper keys, or if the queues argument isn’t a dict with the proper keys.

Signals

Signals sent by fedora_messaging APIs using blinker.base.Signal signals.

pre_publish_signal

fedora_messaging.api.pre_publish_signal = <blinker.base.NamedSignal object at 0x7f7c06908050; 'pre_publish'>

A signal triggered before the message is published. The signal handler should accept a single keyword argument, message, which is the instance of the fedora_messaging.message.Message being sent. It is acceptable to mutate the message, but the validate method will be called on it after this signal.

publish_signal

fedora_messaging.api.publish_signal = <blinker.base.NamedSignal object at 0x7f7c069ca300; 'publish_success'>

A signal triggered after a message is published successfully. The signal handler should accept a single keyword argument, message, which is the instance of the fedora_messaging.message.Message that was sent.

publish_failed_signal

fedora_messaging.api.publish_failed_signal = <blinker.base.NamedSignal object at 0x7f7c06841760; 'publish_failed_signal'>

A signal triggered after a message fails to publish for some reason. The signal handler should accept two keyword argument, message, which is the instance of the fedora_messaging.message.Message that failed to be sent, and error, the exception that was raised.

Message Schemas

This module defines the base class of message objects and keeps a registry of known message implementations. This registry is populated from Python entry points in the “fedora.messages” group.

To implement your own message schema, simply create a class that inherits the Message class, and add an entry point in your Python package under the “fedora.messages” group. For example, an entry point for the Message schema would be:

entry_points = {
    'fedora.messages': [
        'base.message=fedora_messaging.message:Message'
    ]
}

The entry point name must be unique to your application and is used to map messages to your message class, so it’s best to prefix it with your application name (e.g. bodhi.new_update_messageV1). When publishing, the Fedora Messaging library will add a header with the entry point name of the class used so the consumer can locate the correct schema.

Since every client needs to have the message schema installed, you should define this class in a small Python package of its own.

Message

class fedora_messaging.message.Message(body=None, headers=None, topic=None, properties=None, severity=None)[source]

Messages are simply JSON-encoded objects. This allows message authors to define a schema and implement Python methods to abstract the raw message from the user. This allows the schema to change and evolve without breaking the user-facing API.

There are a number of properties that are intended to be overridden by users. These fields are used to sort messages for notifications or are used to create human-readable versions of the messages. Properties that are intended for this purpose are noted in their attribute documentation below.

Parameters:
  • headers (dict) – A set of message headers. Consult the headers schema for expected keys and values.

  • body (dict) – The message body. Consult the body schema for expected keys and values. This dictionary must be JSON-serializable by the default serializer.

  • topic (str) – The message topic as a unicode string. If this is not provided, the default topic for the class is used. See the attribute documentation below for details.

  • properties (pika.BasicProperties) – The AMQP properties. If this is not provided, they will be generated. Most users should not need to provide this, but it can be useful in testing scenarios.

  • severity (int) – An integer that indicates the severity of the message. This is used to determine what messages to notify end users about and should be DEBUG, INFO, WARNING, or ERROR. The default is INFO, and can be set as a class attribute or on an instance-by-instance basis.

id

The message id as a unicode string. This attribute is automatically generated and set by the library and users should only set it themselves in testing scenarios.

Type:

str

topic

The message topic as a unicode string. The topic is used by message consumers to filter what messages they receive. Topics should be a string of words separated by ‘.’ characters, with a length limit of 255 bytes. Because of this byte limit, it is best to avoid non-ASCII character. Topics should start general and get more specific each word. For example: “bodhi.update.kernel” is a possible topic. “bodhi” identifies the application, “update” identifies the message, and “kernel” identifies the package in the update. This can be set at a class level or on a instance level. Dynamic, specific topics that allow for fine-grain filtering are preferred.

Type:

str

headers_schema

A JSON schema to be used with jsonschema.validate() to validate the message headers. For most users, the default definition should suffice.

Type:

dict

body_schema

A JSON schema to be used with jsonschema.validate() to validate the message body. The body_schema is retrieved on a message instance so it is not required to be a class attribute, although this is a convenient approach. Users are also free to write the JSON schema as a file and load the file from the filesystem or network if they prefer.

Type:

dict

body

The message body as a Python dictionary. This is validated by the body schema before publishing and before consuming.

Type:

dict

severity

An integer that indicates the severity of the message. This is used to determine what messages to notify end users about and should be DEBUG, INFO, WARNING, or ERROR. The default is INFO, and can be set as a class attribute or on an instance-by-instance basis.

Type:

int

queue

The name of the queue this message arrived through. This attribute is set automatically by the library and users should never set it themselves.

Type:

str

deprecated

Whether this message schema has been deprecated by a more recent version. Emits a warning when a message of this class is received, to let consumers know that they should plan to upgrade. Defaults to False.

Type:

bool

priority

The priority for the message, if the destination queue supports it. Defaults to zero (lowest priority).

This value is taken into account in queues that have the x-max-priority argument set. Most queues in Fedora don’t support priorities, in which case the value will be ignored.

Larger numbers indicate higher priority, you can read more about it in RabbitMQ’s documentation on priority.

Type:

int

__str__()[source]

A human-readable representation of this message.

This should provide a detailed, long-form representation of the message. The default implementation is to format the raw message id, topic, headers, and body.

Note

Sub-classes should override this method. It is used to create the body of email notifications and by other tools to display messages to humans.

property agent_avatar

An URL to the avatar of the user who caused the action.

Note

Sub-classes should override this method if the default Libravatar and OpenID-based URL generator is not appropriate.

Returns:

The URL to the user’s avatar.

Return type:

str or None

property agent_name

The username of the user who caused the action.

Note

Sub-classes should override this method if the message was triggered by a particular user.

Returns:

The agent’s username.

Return type:

str or None

property app_icon

An URL to the icon of the application that generated the message.

Note

Sub-classes should override this method if their application has an icon and they wish that image to appear in applications that consume messages.

Returns:

The URL to the app’s icon.

Return type:

str or None

property app_name

The name of the application that generated the message.

Note

Sub-classes should override this method.

Returns:

The name of the application.

Return type:

str or None

property containers

List of containers affected by the action that generated this message.

Note

Sub-classes should override this method if the message pertains to one or more container images. The data returned from this property is used to filter notifications.

Returns:

A list of affected container names.

Return type:

list(str)

property flatpaks

List of flatpaks affected by the action that generated this message.

Note

Sub-classes should override this method if the message pertains to one or more flatpaks. The data returned from this property is used to filter notifications.

Returns:

A list of affected flatpaks names.

Return type:

list(str)

property groups

List of groups affected by the action that generated this message.

Note

Sub-classes should override this method if the message pertains to a group or groups. The data returned from this property is used to filter notifications.

Returns:

A list of affected groups.

Return type:

list(str)

property modules

List of modules affected by the action that generated this message.

Note

Sub-classes should override this method if the message pertains to one or more modules. The data returned from this property is used to filter notifications.

Returns:

A list of affected module names.

Return type:

list(str)

property packages

List of RPM packages affected by the action that generated this message.

Note

Sub-classes should override this method if the message pertains to one or more RPM packages. The data returned from this property is used to filter notifications.

Returns:

A list of affected package names.

Return type:

list(str)

property summary

A short, human-readable representation of this message.

This should provide a short summary of the message, much like the subject line of an email.

Note

Sub-classes should override this method. It is used to create the subject of email notifications, IRC notification, and by other tools to display messages to humans in short form.

The default implementation is to simply return the message topic.

property url

An URL to the action that caused this message to be emitted.

Note

Sub-classes should override this method if there is a URL associated with message.

Returns:

A relevant URL.

Return type:

str or None

property usernames

List of users affected by the action that generated this message.

Note

Sub-classes should override this method if the message pertains to a user or users. The data returned from this property is used to filter notifications.

Returns:

A list of affected usernames.

Return type:

list(str)

validate()[source]

Validate the headers and body with the message schema, if any.

In addition to the user-provided schema, all messages are checked against the base schema which requires certain message headers and the that body be a JSON object.

Warning

This method should not be overridden by sub-classes.

Raises:
  • jsonschema.ValidationError – If either the message headers or the message body are invalid.

  • jsonschema.SchemaError – If either the message header schema or the message body schema are invalid.

Message Severity

Each message can have a severity associated with it. The severity is used by applications like the notification service to determine what messages to send to users. The severity can be set at the class level, or on a message-by-message basis. The following are valid severity levels:

DEBUG

fedora_messaging.message.DEBUG = 10

Indicates the message is for debugging or is otherwise very low priority. Users will not be notified unless they’ve explicitly requested DEBUG level messages.

INFO

fedora_messaging.message.INFO = 20

Indicates the message is informational. End users will not receive notifications for these messages by default. For example, automated tests passed for their package.

WARNING

fedora_messaging.message.WARNING = 30

Indicates a problem or an otherwise important problem. Users are notified of these messages when they pertain to packages they are associated with by default. For example, one or more automated tests failed against their package.

ERROR

fedora_messaging.message.ERROR = 40

Indicates a critically important message that users should act upon as soon as possible. For example, their package no longer builds.

SEVERITIES

fedora_messaging.message.SEVERITIES = (10, 20, 30, 40)

A tuple of all valid severity levels

dumps

fedora_messaging.message.dumps(messages)[source]

Serialize messages to a file format acceptable for loads() or for the publish CLI command. The format is a string where each line is a JSON object that conforms to the SERIALIZED_MESSAGE_SCHEMA format.

Parameters:

messages (list or Message) – The messages to serialize. Each message in the messages is subclass of Message.

Returns:

Serialized messages.

Return type:

str

Raises:

ValidationError – If one of the messages provided doesn’t conform to its schema.

loads

fedora_messaging.message.loads(serialized_messages)[source]

Deserialize messages from a file format produced by dumps(). The format is a string where each line is a JSON object that conforms to the SERIALIZED_MESSAGE_SCHEMA format.

Parameters:

serialized_messages (str) – A string made up of a JSON object per line.

Returns:

Deserialized message objects.

Return type:

list

Raises:

ValidationError – If the string isn’t formatted properly or message doesn’t pass the message schema validation

SERIALIZED_MESSAGE_SCHEMA

fedora_messaging.message.SERIALIZED_MESSAGE_SCHEMA = {'$schema': 'http://json-schema.org/draft-04/schema#', 'description': 'Schema for the JSON object used to represent messages in a file', 'properties': {'body': {'description': 'The message body.', 'type': 'object'}, 'headers': {'description': 'The message headers', 'properties': {'fedora_messaging_schema': {'type': 'string'}, 'fedora_messaging_severity': {'enum': [10, 20, 30, 40], 'type': 'number'}, 'sent-at': {'type': 'string'}}, 'type': 'object'}, 'id': {'description': "The message's UUID.", 'type': 'string'}, 'priority': {'description': 'The priority that the message has been sent with.', 'type': ['integer', 'null']}, 'queue': {'description': 'The queue the message arrived on, if any.', 'type': ['string', 'null']}, 'topic': {'description': 'The message topic', 'type': 'string'}}, 'required': ['topic', 'body'], 'type': 'object'}

The schema for each JSON object produced by dumps(), consumed by loads(), and expected by CLI commands like “fedora-messaging publish”.

Utilities

The schema_utils module contains utilities that may be useful when writing the Python API of your message schemas.

libravatar_url

fedora_messaging.schema_utils.libravatar_url(email=None, openid=None, size=64, default='retro')[source]

Get the URL to an avatar from libravatar.

Either the user’s email or openid must be provided.

If you want to use Libravatar federation (through DNS), you should install and use the libravatar library instead. Check out the libravatar.libravatar_url() function.

Parameters:
  • email (str) – The user’s email

  • openid (str) – The user’s OpenID

  • size (int) – Size of the avatar in pixels (it’s a square).

  • default (str) – Default avatar to return if not found.

Returns:

The URL to the avatar image.

Return type:

str

Raises:

ValueError – If neither email nor openid are provided.

Exceptions

Exceptions raised by Fedora Messaging.

exception fedora_messaging.exceptions.BadDeclaration(obj_type=None, description=None, reason=None)[source]

Raised when declaring an object in AMQP fails.

Parameters:
  • obj_type (str) – The type of object being declared. One of “binding”, “queue”, or “exchange”.

  • description (dict) – The description of the object.

  • reason (str) – The reason the server gave for rejecting the declaration.

exception fedora_messaging.exceptions.BaseException[source]

The base class for all exceptions raised by fedora_messaging.

exception fedora_messaging.exceptions.ConfigurationException(message)[source]

Raised when there’s an invalid configuration setting

Parameters:

message (str) – A detailed description of the configuration problem which is presented to the user.

exception fedora_messaging.exceptions.ConnectionException(*args, **kwargs)[source]

Raised if a general connection error occurred.

You may handle this exception by logging it and resending or discarding the message.

exception fedora_messaging.exceptions.ConsumeException[source]

Base class for exceptions related to consuming.

exception fedora_messaging.exceptions.ConsumerCanceled[source]

Raised when the server has canceled the consumer.

This can happen when the queue the consumer is subscribed to is deleted, or when the node the queue is located on fails.

exception fedora_messaging.exceptions.Drop[source]

Consumer callbacks should raise this to indicate they wish the message they are currently processing to be dropped.

exception fedora_messaging.exceptions.HaltConsumer(exit_code=0, reason=None, requeue=False, **kwargs)[source]

Consumer callbacks should raise this exception if they wish the consumer to be shut down.

Parameters:
  • exit_code (int) – The exit code to use when halting.

  • reason (str) – A reason for halting, presented to the user.

  • requeue (bool) – If true, the message is re-queued for later processing.

exception fedora_messaging.exceptions.Nack[source]

Consumer callbacks should raise this to indicate they wish the message they are currently processing to be re-queued.

exception fedora_messaging.exceptions.NoFreeChannels[source]

Raised when a connection has reached its channel limit

exception fedora_messaging.exceptions.PermissionException(obj_type=None, description=None, reason=None)[source]

Generic permissions exception.

Parameters:
  • obj_type (str) – The type of object being accessed that caused the permission error. May be None if the cause is unknown.

  • description (object) – The description of the object, if any. May be None.

  • reason (str) – The reason the server gave for the permission error, if any. If no reason is supplied by the server, this should be the best guess for what caused the error.

exception fedora_messaging.exceptions.PublishException(reason=None, **kwargs)[source]

Base class for exceptions related to publishing.

exception fedora_messaging.exceptions.PublishForbidden(reason=None, **kwargs)[source]

Raised when the broker rejects the message due to permission errors.

You may handle this exception by logging it and discarding the message, as it is likely a permanent error.

exception fedora_messaging.exceptions.PublishReturned(reason=None, **kwargs)[source]

Raised when the broker rejects and returns the message to the publisher.

You may handle this exception by logging it and resending or discarding the message.

exception fedora_messaging.exceptions.PublishTimeout(reason=None, **kwargs)[source]

Raised when the message could not be published in the given timeout.

This means the message was either never delivered to the broker, or that it was delivered, but never acknowledged by the broker.

exception fedora_messaging.exceptions.ValidationError[source]

This error is raised when a message fails validation with its JSON schema

This exception can be raised on an incoming or outgoing message. No need to catch this exception when publishing, it should warn you during development and testing that you’re trying to publish a message with a different format, and that you should either fix it or update the schema.

Configuration

conf

fedora_messaging.config.conf = {}

The configuration dictionary used by fedora-messaging and consumers.

DEFAULTS

fedora_messaging.config.DEFAULTS = {'amqp_url': 'amqp://?connection_attempts=3&retry_delay=5', 'bindings': [{'exchange': 'amq.topic', 'queue': '', 'routing_keys': ['#']}], 'callback': None, 'client_properties': {'app': 'Unknown', 'information': 'https://fedora-messaging.readthedocs.io/en/stable/', 'product': 'Fedora Messaging with Pika', 'version': 'fedora_messaging-3.5.0 with pika-1.3.2'}, 'consumer_config': {}, 'exchanges': {'amq.topic': {'arguments': {}, 'auto_delete': False, 'durable': True, 'type': 'topic'}}, 'log_config': {'disable_existing_loggers': False, 'formatters': {'simple': {'format': '[%(name)s %(levelname)s] %(message)s'}}, 'handlers': {'console': {'class': 'logging.StreamHandler', 'formatter': 'simple', 'stream': 'ext://sys.stdout'}}, 'loggers': {'fedora_messaging': {'handlers': ['console'], 'level': 'INFO', 'propagate': False}}, 'root': {'handlers': ['console'], 'level': 'WARNING'}, 'version': 1}, 'passive_declares': False, 'publish_exchange': 'amq.topic', 'publish_priority': None, 'qos': {'prefetch_count': 10, 'prefetch_size': 0}, 'queues': {'': {'arguments': {}, 'auto_delete': True, 'durable': False, 'exclusive': True}}, 'tls': {'ca_cert': None, 'certfile': None, 'keyfile': None}, 'topic_prefix': ''}

The default configuration settings for fedora-messaging. This should not be modified and should be copied with copy.deepcopy().