Developer Interface

This documentation covers the public interfaces fedora_messaging provides.

Note

Documented interfaces follow Semantic Versioning 2.0.0. Any interface not documented here may change at any time without warning.

Publishing

fedora_messaging.api.publish(message, exchange=None)[source]

Publish a message to an exchange.

This is a synchronous call, meaning that when this function returns, an acknowledgment has been received from the message broker and you can be certain the message was published successfully.

There are some cases where an error occurs despite your message being successfully published. For example, if a network partition occurs after the message is received by the broker. Therefore, you may publish duplicate messages. For complete details, see the Publishing documentation.

>>> from fedora_messaging import api
>>> message = api.Message(body={'Hello': 'world'}, topic='Hi')
>>> api.publish(message)

If an attempt to publish fails because the broker rejects the message, it is not retried. Connection attempts to the broker can be configured using the “connection_attempts” and “retry_delay” options in the broker URL. See pika.connection.URLParameters for details.

Parameters:
Raises:

Subscribing

fedora_messaging.api.consume(callback, bindings=None)[source]

Start a message consumer that executes the provided callback when messages are received.

This API is blocking and will not return until the process receives a signal from the operating system.

The callback receives a single positional argument, the message:

>>> from fedora_messaging import api
>>> def my_callback(message):
...     print(message)
>>> bindings = [{'exchange': 'amq.topic', 'queue': 'demo', 'routing_keys': ['#']}]
>>> api.consume(my_callback, bindings=bindings)

For complete documentation on writing consumers, see the Consumers documentation.

Parameters:
  • callback (callable) – A callable object that accepts one positional argument, a Message.
  • bindings (dict or list of dict) – The bindings to use when consuming. This should be the same format as the bindings configuration.
Raises:
  • fedora_messaging.exceptions.HaltConsumer – If the consumer requests that it be stopped.
  • ValueError – If the consumer provide callback that is not a class that implements __call__ and is not a function, or if bindings isn’t a dictionary or list of dictionaries.

Signals

Signals sent by fedora_messaging APIs using blinker.base.Signal signals.

fedora_messaging.api.pre_publish_signal = <blinker.base.NamedSignal object at 0x7f4724b0f550; 'pre_publish'>

A signal triggered before the message is published. The signal handler should accept a single keyword argument, message, which is the instance of the fedora_messaging.message.Message being sent. It is acceptable to mutate the message, but the validate method will be called on it after this signal.

fedora_messaging.api.publish_signal = <blinker.base.NamedSignal object at 0x7f4723c72400; 'publish_success'>

A signal triggered after a message is published successfully. The signal handler should accept a single keyword argument, message, which is the instance of the fedora_messaging.message.Message that was sent.

fedora_messaging.api.publish_failed_signal = <blinker.base.NamedSignal object at 0x7f4723c72358; 'publish_failed_signal'>

A signal triggered after a message fails to publish for some reason. The signal handler should accept two keyword argument, message, which is the instance of the fedora_messaging.message.Message that failed to be sent, and error, the exception that was raised.

Message Schemas

This module defines the base class of message objects and keeps a registry of known message implementations. This registry is populated from Python entry points in the “fedora.messages” group.

To implement your own message schema, simply create a class that inherits the Message class, and add an entry point in your Python package under the “fedora.messages” group. For example, an entry point for the Message schema would be:

entry_points = {
    'fedora.messages': [
        'base.message=fedora_messaging.message:Message'
    ]
}

The entry point name must be unique to your application and is used to map messages to your message class, so it’s best to prefix it with your application name (e.g. bodhi.new_update_messageV1). When publishing, the Fedora Messaging library will add a header with the entry point name of the class used so the consumer can locate the correct schema.

Since every client needs to have the message schema installed, you should define this class in a small Python package of its own.

class fedora_messaging.message.Message(body=None, headers=None, topic=None, properties=None, severity=None)[source]

Messages are simply JSON-encoded objects. This allows message authors to define a schema and implement Python methods to abstract the raw message from the user. This allows the schema to change and evolve without breaking the user-facing API.

A message class includes a topic. This topic is used by message consumers to filter what messages they receive. Topics should be a string of words separated by ‘.’ characters, with a length limit of 255 bytes. Because of this limit, it is best to avoid non-ASCII characters as well as variable-length components where the total size of the topic would exceed 255 bytes.

id

six.text_type – The message id as a unicode string.

topic

six.text_type – The message topic as a unicode string.

headers_schema

dict – A JSON schema to be used with jsonschema.validate() to validate the message headers.

body_schema

dict – A JSON schema to be used with jsonschema.validate() to validate the message headers.

severity

int – An integer that indicates the severity of the message. This is used to determine what messages to notify end users about and should be DEBUG, INFO, WARNING, or ERROR.

queue

str – The name of the queue this message arrived through.

Parameters:
  • headers (dict) – A set of message headers. Consult the headers schema for expected keys and values.
  • body (dict) – The message body. Consult the body schema for expected keys and values.
  • topic (six.text_type) – The message topic as a unicode string. If this is not provided, the default topic for the class is used.
  • properties (pika.BasicProperties) – The AMQP properties. If this is not provided, they will be generated.
agent_avatar

An URL to the avatar of the user who caused the action.

Returns:The URL to the user’s avatar.
Return type:str or None
app_icon

An URL to the icon of the application that generated the message.

Returns:The URL to the app’s icon.
Return type:str or None
containers

List of containers affected by the action that generated this message.

Returns:A list of affected container names.
Return type:list(str)
flatpaks

List of flatpaks affected by the action that generated this message.

Returns:A list of affected flatpaks names.
Return type:list(str)
modules

List of modules affected by the action that generated this message.

Returns:A list of affected module names.
Return type:list(str)
packages

List of packages affected by the action that generated this message.

Returns:A list of affected package names.
Return type:list(str)
summary

A short, human-readable representation of this message.

This should provide a short summary of the message, much like the subject line of an email.

The default implementation is to simply return the message topic.

url

An URL to the action that caused this message to be emitted.

Returns:A relevant URL.
Return type:str or None
usernames

List of users affected by the action that generated this message.

Returns:A list of affected usernames.
Return type:list(str)
validate()[source]

Validate the headers and body with the message schema, if any.

In addition to the user-provided schema, all messages are checked against the base schema which requires certain message headers and the that body be a JSON object.

Warning

This method should not be overridden by sub-classes.

Raises:
  • jsonschema.ValidationError – If either the message headers or the message body are invalid.
  • jsonschema.SchemaError – If either the message header schema or the message body schema are invalid.
fedora_messaging.message.get_class(schema_name)[source]

Retrieve the message class associated with the schema name.

If no match is found, the default schema is returned and a warning is logged.

Parameters:schema_name (six.text_type) – The name of the Message sub-class; this is typically the Python path.
Returns:A sub-class of Message to create the message from.
Return type:Message

Message Severity

Each message can have a severity associated with it. The severity is used by applications like the notification service to determine what messages to send to users. The severity can be set at the class level, or on a message-by-message basis. The following are valid severity levels:

fedora_messaging.message.DEBUG = 10

Indicates the message is for debugging or is otherwise very low priority. Users will not be notified unless they’ve explicitly requested DEBUG level messages.

fedora_messaging.message.INFO = 20

Indicates the message is informational. End users will not receive notifications for these messages by default. For example, automated tests passed for their package.

fedora_messaging.message.WARNING = 30

Indicates a problem or an otherwise important problem. Users are notified of these messages when they pertain to packages they are associated with by default. For example, one or more automated tests failed against their package.

fedora_messaging.message.ERROR = 40

Indicates a critically important message that users should act upon as soon as possible. For example, their package no longer builds.

Exceptions

Exceptions raised by Fedora Messaging.

exception fedora_messaging.exceptions.BadDeclaration(obj_type, description, reason)[source]

Raised when declaring an object in AMQP fails.

Parameters:
  • obj_type (str) – The type of object being declared. One of “binding”, “queue”, or “exchange”.
  • description (dict) – The description of the object.
  • reason (str) – The reason the server gave for rejecting the declaration.
exception fedora_messaging.exceptions.BaseException[source]

The base class for all exceptions raised by fedora_messaging.

exception fedora_messaging.exceptions.ConfigurationException(message)[source]

Raised when there’s an invalid configuration setting

Parameters:message (str) – A detailed description of the configuration problem which is presented to the user.
exception fedora_messaging.exceptions.ConnectionException(reason=None, **kwargs)[source]

Raised if a general connection error occurred.

You may handle this exception by logging it and resending or discarding the message.

exception fedora_messaging.exceptions.ConsumeException[source]

Base class for exceptions related to consuming.

exception fedora_messaging.exceptions.Drop[source]

Consumer callbacks should raise this to indicate they wish the message they are currently processing to be dropped.

exception fedora_messaging.exceptions.HaltConsumer(exit_code=0, reason=None, requeue=False, **kwargs)[source]

Consumer callbacks should raise this exception if they wish the consumer to be shut down.

Parameters:
  • exit_code (int) – The exit code to use when halting.
  • reason (str) – A reason for halting, presented to the user.
  • requeue (bool) – If true, the message is re-queued for later processing.
exception fedora_messaging.exceptions.Nack[source]

Consumer callbacks should raise this to indicate they wish the message they are currently processing to be re-queued.

exception fedora_messaging.exceptions.NoFreeChannels[source]

Raised when a connection has reached its channel limit

exception fedora_messaging.exceptions.PublishException(reason=None, **kwargs)[source]

Base class for exceptions related to publishing.

exception fedora_messaging.exceptions.PublishReturned(reason=None, **kwargs)[source]

Raised when the broker rejects and returns the message to the publisher.

You may handle this exception by logging it and resending or discarding the message.

exception fedora_messaging.exceptions.ValidationError[source]

This error is raised when a message fails validation with its JSON schema

This exception can be raised on an incoming or outgoing message. No need to catch this exception when publishing, it should warn you during development and testing that you’re trying to publish a message with a different format, and that you should either fix it or update the schema.

Twisted

In addition to the synchronous API, a Twisted API is provided for applications that need an asynchronous API. This API requires Twisted 16.1.0 or greater.

Protocol

The core Twisted interface, a protocol represent a specific connection to the AMQP broker. When automatic reconnecting is required, the fedora_messaging.twisted.factory.FedoraMessagingFactory class should be used to produce configured instances of FedoraMessagingProtocol.

FedoraMessagingProtocol is based on pika’s Twisted Protocol. It implements message schema validation when publishing and subscribing, as well as a few convenience methods for declaring multiple objects at once.

For an overview of Twisted clients, see the Twisted client documentation.

class fedora_messaging.twisted.protocol.FedoraMessagingProtocol(parameters, confirms=True)[source]

A Twisted Protocol for the Fedora Messaging system.

This protocol builds on the generic pika AMQP protocol to add calls specific to the Fedora Messaging implementation.

Parameters:
  • parameters (pika.ConnectionParameters) – The connection parameters.
  • confirms (bool) – If True, all outgoing messages will require a confirmation from the server, and the Deferred returned from the publish call will wait for that confirmation.
bind_queues(bindings)[source]

Declare a set of bindings between queues and exchanges.

Parameters:

bindings (list of dict) – A list of binding definitions. Each dictionary must contain the “queue” key whose value is the name of the queue to create the binding on, as well as the “exchange” key whose value should be the name of the exchange to bind to. Additional acceptable keys are any keyword arguments accepted by pika.channel.Channel.queue_bind().

Raises:
  • NoFreeChannels – If there are no available channels on this connection. If this occurs, you can either reduce the number of consumers on this connection or create an additional connection.
  • BadDeclaration – If a binding could not be declared. This can occur if the queue or exchange don’t exist, or if they do, but the current user does not have permissions to create bindings.
cancel(queue)[source]

Cancel the consumer for a queue.

Parameters:queue (str) – The name of the queue the consumer is subscribed to.
Returns:
A Deferred that fires when the consumer
is canceled, or None if the consumer was already canceled. Wrap the call in defer.maybeDeferred() to always receive a Deferred.
Return type:defer.Deferred
connectionReady(res=None)[source]

Callback invoked when the AMQP connection is ready.

This API is not meant for users.

consume(callback, queue)[source]

Register a message consumer that executes the provided callback when messages are received.

The queue must exist prior to calling this method. If a consumer already exists for the given queue, the callback is simply updated and any new messages for that consumer use the new callback.

If resumeProducing() has not been called when this method is called, it will be called for you.

Parameters:
  • callback (callable) – The callback to invoke when a message is received.
  • queue (str) – The name of the queue to consume from.
Returns:

A namedtuple that identifies this consumer.

Return type:

Consumer

NoFreeChannels: If there are no available channels on this connection.
If this occurs, you can either reduce the number of consumers on this connection or create an additional connection.
declare_exchanges(exchanges)[source]

Declare a number of exchanges at once.

This simply wraps the pika.channel.Channel.exchange_declare() method and deals with error handling and channel allocation.

Parameters:

exchanges (list of dict) –

A list of dictionaries, where each dictionary represents an exchange. Each dictionary can have the following keys:

  • exchange (str): The exchange’s name
  • exchange_type (str): The type of the exchange (“direct”, “topic”, etc)
  • passive (bool): If true, this will just assert that the exchange exists, but won’t create it if it doesn’t.
  • durable (bool): Whether or not the exchange is durable
  • arguments (dict): Extra arguments for the exchange’s creation.

Raises:
  • NoFreeChannels – If there are no available channels on this connection. If this occurs, you can either reduce the number of consumers on this connection or create an additional connection.
  • BadDeclaration – If an exchange could not be declared. This can occur if the exchange already exists, but does its type does not match (e.g. it is declared as a “topic” exchange, but exists as a “direct” exchange). It can also occur if it does not exist, but the current user does not have permissions to create the object.
declare_queues(queues)[source]

Declare a list of queues.

Parameters:

queues (list of dict) –

A list of dictionaries, where each dictionary represents an exchange. Each dictionary can have the following keys:

  • queue (str): The name of the queue
  • passive (bool): If true, this will just assert that the queue exists, but won’t create it if it doesn’t.
  • durable (bool): Whether or not the queue is durable
  • exclusive (bool): Whether or not the queue is exclusive to this connection.
  • auto_delete (bool): Whether or not the queue should be automatically deleted once this connection ends.
  • arguments (dict): Additional arguments for the creation of the queue.

Raises:
  • NoFreeChannels – If there are no available channels on this connection. If this occurs, you can either reduce the number of consumers on this connection or create an additional connection.
  • BadDeclaration – If a queue could not be declared. This can occur if the queue already exists, but does its type does not match (e.g. it is declared as a durable queue, but exists as a non-durable queue). It can also occur if it does not exist, but the current user does not have permissions to create the object.
pauseProducing()[source]

Pause the reception of messages by canceling all existing consumers. This does not disconnect from the server.

Message reception can be resumed with resumeProducing().

Returns:fired when the production is paused.
Return type:Deferred
publish(message, exchange)[source]

Publish a fedora_messaging.message.Message to an exchange on the message broker.

Parameters:
  • message (message.Message) – The message to publish.
  • exchange (str) – The name of the AMQP exchange to publish to
Raises:
  • NoFreeChannels – If there are no available channels on this connection. If this occurs, you can either reduce the number of consumers on this connection or create an additional connection.
  • PublishReturned – If the broker rejected the message. This can happen if there are resource limits that have been reached (full disk, for example) or if the message will be routed to 0 queues and the exchange is set to reject such messages.
resumeProducing()[source]

Starts or resumes the retrieval of messages from the server queue.

This method starts receiving messages from the server, they will be passed to the consumer callback.

Note

This is called automatically when consume() is called, so users should not need to call this unless pauseProducing() has been called.

Returns:fired when the production is ready to start
Return type:defer.Deferred
stopProducing()[source]

Stop producing messages and disconnect from the server.

Returns:fired when the production is stopped.
Return type:Deferred
class fedora_messaging.twisted.protocol.Consumer(tag, queue, callback, channel)

A namedtuple that represents a AMQP consumer.

  • The tag field is the consumer’s AMQP tag (str).
  • The queue field is the name of the queue it’s consuming from (str).
  • The callback field is the function called for each message (a callable).
  • The channel is the AMQP channel used for the consumer (pika.adapters.twisted_connection.TwistedChannel).
callback

Alias for field number 2

channel

Alias for field number 3

queue

Alias for field number 1

tag

Alias for field number 0

Factory

A Twisted Factory for creating and configuring instances of the FedoraMessagingProtocol.

A factory is used to implement automatic re-connections by producing protocol instances (connections) on demand. Twisted uses factories for its services APIs.

See the Twisted client documentation for more information.

class fedora_messaging.twisted.factory.FedoraMessagingFactory(parameters, confirms=True, exchanges=None, queues=None, bindings=None)[source]

Reconnecting factory for the Fedora Messaging protocol.

buildProtocol(addr)[source]

Create the Protocol instance.

See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.

cancel(queue)[source]

Cancel the consumer for a queue.

This removes the consumer from the list of consumers to be configured for every connection.

Parameters:queue (str) – The name of the queue the consumer is subscribed to.
Returns:
Either a Deferred that fires when the consumer
is canceled, or None if the consumer was already canceled. Wrap the call in defer.maybeDeferred() to always receive a Deferred.
Return type:defer.Deferred or None
clientConnectionFailed(connector, reason)[source]

Called when the client has failed to connect to the broker.

See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.

clientConnectionLost(connector, reason)[source]

Called when the connection to the broker has been lost.

See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.

consume(callback, queue)[source]

Register a new consumer.

This consumer will be configured for every protocol this factory produces so it will be reconfigured on network failures. If a connection is already active, the consumer will be added to it.

Parameters:
  • callback (callable) – The callback to invoke when a message arrives.
  • queue (str) – The name of the queue to consume from.
protocol

alias of fedora_messaging.twisted.protocol.FedoraMessagingProtocol

publish(message, exchange=None)[source]

Publish a fedora_messaging.message.Message to an exchange on the message broker. This call will survive connection failures and try until it succeeds or is canceled.

Parameters:
Returns:

A deferred that fires when the message is published.

Return type:

defer.Deferred

Raises:
  • PublishReturned – If the published message is rejected by the broker.
  • ConnectionException – If a connection error occurs while publishing. Calling this method again will wait for the next connection and publish when it is available.
startedConnecting(connector)[source]

Called when the connection to the broker has started.

See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.

stopFactory()[source]

Stop the factory.

See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.

stopTrying()[source]

Stop trying to reconnect to the broker.

See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.

whenConnected()[source]

Get the next connected protocol instance.

Returns:
A deferred that results in a connected
FedoraMessagingProtocol.
Return type:defer.Deferred

Service

Twisted Service to start and stop the Fedora Messaging Twisted Factory.

This Service makes it easier to build a Twisted application that embeds a Fedora Messaging component. See the verify_missing service in fedmsg-migration-tools for a use case.

See https://twistedmatrix.com/documents/current/core/howto/application.html

class fedora_messaging.twisted.service.FedoraMessagingService(amqp_url=None, exchanges=None, queues=None, bindings=None, consumers=None)[source]

A Twisted service to connect to the Fedora Messaging broker.

Parameters:
  • on_message (callable|None) – Callback that will be passed each incoming messages. If None, no message consuming is setup.
  • amqp_url (str) – URL to use for the AMQP server.
  • exchanges (list of dicts) – List of exchanges to declare at the start of every connection. Each dictionary is passed to pika.channel.Channel.exchange_declare() as keyword arguments, so any parameter to that method is a valid key.
  • queues (list of dicts) – List of queues to declare at the start of every connection. Each dictionary is passed to pika.channel.Channel.queue_declare() as keyword arguments, so any parameter to that method is a valid key.
  • bindings (list of dicts) – A list of bindings to be created between queues and exchanges. Each dictionary is passed to pika.channel.Channel.queue_bind(). The “queue” and “exchange” keys are required.
  • consumers (dict) – A dictionary where each key is a queue name and the value is a callable object to handle messages on that queue. Consumers will be set up after each connection is established so they will survive networking issues.
factoryClass

alias of fedora_messaging.twisted.factory.FedoraMessagingFactory