Developer Interface

This documentation covers the public interfaces fedora_messaging provides.

Note

Documented interfaces follow Semantic Versioning 2.0.0. Any interface not documented here may change at any time without warning.

Publishing

publish

fedora_messaging.api.publish(message, exchange=None, timeout=30)[source]

Publish a message to an exchange.

This is a synchronous call, meaning that when this function returns, an acknowledgment has been received from the message broker and you can be certain the message was published successfully.

There are some cases where an error occurs despite your message being successfully published. For example, if a network partition occurs after the message is received by the broker. Therefore, you may publish duplicate messages. For complete details, see the Publishing documentation.

>>> from fedora_messaging import api
>>> message = api.Message(body={'Hello': 'world'}, topic='Hi')
>>> api.publish(message)

If an attempt to publish fails because the broker rejects the message, it is not retried. Connection attempts to the broker can be configured using the “connection_attempts” and “retry_delay” options in the broker URL. See pika.connection.URLParameters for details.

Parameters:
  • message (message.Message) – The message to publish.
  • exchange (str) – The name of the AMQP exchange to publish to; defaults to publish_exchange
  • timeout (int) – The maximum time in seconds to wait before giving up attempting to publish the message. If the timeout is reached, a PublishTimeout exception is raised.
Raises:

Subscribing

twisted_consume

fedora_messaging.api.twisted_consume(callback, bindings=None, queues=None)[source]

Start a consumer using the provided callback and run it using the Twisted event loop (reactor).

Note

Callbacks run in a Twisted-managed thread pool using the twisted.internet.threads.deferToThread() API to avoid them blocking the event loop. If you wish to use Twisted APIs in your callback you must use the twisted.internet.threads.blockingCallFromThread() or twisted.internet.interfaces.IReactorFromThreads APIs.

This API expects the caller to start the reactor.

Parameters:
  • callback (callable) – A callable object that accepts one positional argument, a Message or a class object that implements the __call__ method. The class will be instantiated before use.
  • bindings (dict or list of dict) – Bindings to declare before consuming. This should be the same format as the bindings configuration.
  • queues (dict) – The queue to declare and consume from. Each key in this dictionary should be a queue name to declare, and each value should be a dictionary with the “durable”, “auto_delete”, “exclusive”, and “arguments” keys.
Raises:

ValueError – If the callback, bindings, or queues are invalid.

Returns:

A deferred that fires with the list of one or more Consumer objects. Each consumer object has a Consumer.result instance variable that is a Deferred that fires or errors when the consumer halts. Note that this API is meant to survive network problems, so consuming will continue until Consumer.cancel() is called or a fatal server error occurs. The deferred returned by this function may error back with a fedora_messaging.exceptions.BadDeclaration if queues or bindings cannot be declared on the broker, a fedora_messaging.exceptions.PermissionException if the user doesn’t have access to the queue, or fedora_messaging.exceptions.ConnectionException if the TLS or AMQP handshake fails.

Return type:

twisted.internet.defer.Deferred

Consumer

class fedora_messaging.api.Consumer(queue=None, callback=None)[source]

Represents a Twisted AMQP consumer and is returned from the call to fedora_messaging.api.twisted_consume().

queue

The AMQP queue this consumer is subscribed to.

Type:str
callback

The callback to run when a message arrives.

Type:callable
result

A deferred that runs the callbacks if the consumer exits gracefully after being canceled by a call to Consumer.cancel() and errbacks if the consumer stops for any other reason. The reasons a consumer could stop are: a fedora_messaging.exceptions.PermissionExecption if the consumer does not have permissions to read from the queue it is subscribed to, a HaltConsumer is raised by the consumer indicating it wishes to halt, an unexpected Exception is raised by the consumer, or if the consumer is canceled by the server which happens if the queue is deleted by an administrator or if the node the queue lives on fails.

Type:twisted.internet.defer.Deferred
cancel()[source]

Cancel the consumer and clean up resources associated with it. Consumers that are canceled are allowed to finish processing any messages before halting.

Returns:A deferred that fires when the consumer has finished processing any message it was in the middle of and has been successfully canceled.
Return type:defer.Deferred

consume

fedora_messaging.api.consume(callback, bindings=None, queues=None)[source]

Start a message consumer that executes the provided callback when messages are received.

This API is blocking and will not return until the process receives a signal from the operating system.

Warning

This API is runs the callback in the IO loop thread. This means if your callback could run for a length of time near the heartbeat interval, which is likely on the order of 60 seconds, the broker will kill the TCP connection and the message will be re-delivered on start-up.

For now, use the twisted_consume() API which runs the callback in a thread and continues to handle AMQP events while the callback runs if you have a long-running callback.

The callback receives a single positional argument, the message:

>>> from fedora_messaging import api
>>> def my_callback(message):
...     print(message)
>>> bindings = [{'exchange': 'amq.topic', 'queue': 'demo', 'routing_keys': ['#']}]
>>> queues = {
...     "demo": {"durable": False, "auto_delete": True, "exclusive": True, "arguments": {}}
... }
>>> api.consume(my_callback, bindings=bindings, queues=queues)

If the bindings and queue arguments are not provided, they will be loaded from the configuration.

For complete documentation on writing consumers, see the Consumers documentation.

Parameters:
  • callback (callable) – A callable object that accepts one positional argument, a Message or a class object that implements the __call__ method. The class will be instantiated before use.
  • bindings (dict or list of dict) – Bindings to declare before consuming. This should be the same format as the bindings configuration.
  • queues (dict) – The queue or queues to declare and consume from. This should be in the same format as the queues configuration dictionary where each key is a queue name and each value is a dictionary of settings for that queue.
Raises:
  • fedora_messaging.exceptions.HaltConsumer – If the consumer requests that it be stopped.
  • ValueError – If the consumer provide callback that is not a class that implements __call__ and is not a function, if the bindings argument is not a dict or list of dicts with the proper keys, or if the queues argument isn’t a dict with the proper keys.

Signals

Signals sent by fedora_messaging APIs using blinker.base.Signal signals.

pre_publish_signal

fedora_messaging.api.pre_publish_signal = <blinker.base.NamedSignal object at 0x7f8ab989aa58; 'pre_publish'>

A signal triggered before the message is published. The signal handler should accept a single keyword argument, message, which is the instance of the fedora_messaging.message.Message being sent. It is acceptable to mutate the message, but the validate method will be called on it after this signal.

publish_signal

fedora_messaging.api.publish_signal = <blinker.base.NamedSignal object at 0x7f8ab989a6d8; 'publish_success'>

A signal triggered after a message is published successfully. The signal handler should accept a single keyword argument, message, which is the instance of the fedora_messaging.message.Message that was sent.

publish_failed_signal

fedora_messaging.api.publish_failed_signal = <blinker.base.NamedSignal object at 0x7f8ab98ebba8; 'publish_failed_signal'>

A signal triggered after a message fails to publish for some reason. The signal handler should accept two keyword argument, message, which is the instance of the fedora_messaging.message.Message that failed to be sent, and error, the exception that was raised.

Message Schemas

This module defines the base class of message objects and keeps a registry of known message implementations. This registry is populated from Python entry points in the “fedora.messages” group.

To implement your own message schema, simply create a class that inherits the Message class, and add an entry point in your Python package under the “fedora.messages” group. For example, an entry point for the Message schema would be:

entry_points = {
    'fedora.messages': [
        'base.message=fedora_messaging.message:Message'
    ]
}

The entry point name must be unique to your application and is used to map messages to your message class, so it’s best to prefix it with your application name (e.g. bodhi.new_update_messageV1). When publishing, the Fedora Messaging library will add a header with the entry point name of the class used so the consumer can locate the correct schema.

Since every client needs to have the message schema installed, you should define this class in a small Python package of its own.

Message

class fedora_messaging.message.Message(body=None, headers=None, topic=None, properties=None, severity=None)[source]

Messages are simply JSON-encoded objects. This allows message authors to define a schema and implement Python methods to abstract the raw message from the user. This allows the schema to change and evolve without breaking the user-facing API.

There are a number of properties that are intended to be overridden by users. These fields are used to sort messages for notifications or are used to create human-readable versions of the messages. Properties that are intended for this purpose are noted in their attribute documentation below.

Parameters:
  • headers (dict) – A set of message headers. Consult the headers schema for expected keys and values.
  • body (dict) – The message body. Consult the body schema for expected keys and values. This dictionary must be JSON-serializable by the default serializer.
  • topic (six.text_type) – The message topic as a unicode string. If this is not provided, the default topic for the class is used. See the attribute documentation below for details.
  • properties (pika.BasicProperties) – The AMQP properties. If this is not provided, they will be generated. Most users should not need to provide this, but it can be useful in testing scenarios.
  • severity (int) – An integer that indicates the severity of the message. This is used to determine what messages to notify end users about and should be DEBUG, INFO, WARNING, or ERROR. The default is INFO, and can be set as a class attribute or on an instance-by-instance basis.
id

The message id as a unicode string. This attribute is automatically generated and set by the library and users should only set it themselves in testing scenarios.

Type:six.text_type
topic

The message topic as a unicode string. The topic is used by message consumers to filter what messages they receive. Topics should be a string of words separated by ‘.’ characters, with a length limit of 255 bytes. Because of this byte limit, it is best to avoid non-ASCII character. Topics should start general and get more specific each word. For example: “bodhi.update.kernel” is a possible topic. “bodhi” identifies the application, “update” identifies the message, and “kernel” identifies the package in the update. This can be set at a class level or on a instance level. Dynamic, specific topics that allow for fine-grain filtering are preferred.

Type:six.text_type
headers_schema

A JSON schema to be used with jsonschema.validate() to validate the message headers. For most users, the default definition should suffice.

Type:dict
body_schema

A JSON schema to be used with jsonschema.validate() to validate the message body. The body_schema is retrieved on a message instance so it is not required to be a class attribute, although this is a convenient approach. Users are also free to write the JSON schema as a file and load the file from the filesystem or network if they prefer.

Type:dict
body

The message body as a Python dictionary. This is validated by the body schema before publishing and before consuming.

Type:dict
severity

An integer that indicates the severity of the message. This is used to determine what messages to notify end users about and should be DEBUG, INFO, WARNING, or ERROR. The default is INFO, and can be set as a class attribute or on an instance-by-instance basis.

Type:int
queue

The name of the queue this message arrived through. This attribute is set automatically by the library and users should never set it themselves.

Type:str
__str__()[source]

A human-readable representation of this message.

This should provide a detailed, long-form representation of the message. The default implementation is to format the raw message id, topic, headers, and body.

Note

Sub-classes should override this method. It is used to create the body of email notifications and by other tools to display messages to humans.

agent_avatar

An URL to the avatar of the user who caused the action.

Note

Sub-classes should override this method if the message was triggered by a particular user.

Returns:The URL to the user’s avatar.
Return type:str or None
app_icon

An URL to the icon of the application that generated the message.

Note

Sub-classes should override this method if their application has an icon and they wish that image to appear in applications that consume messages.

Returns:The URL to the app’s icon.
Return type:str or None
containers

List of containers affected by the action that generated this message.

Note

Sub-classes should override this method if the message pertains to one or more container images. The data returned from this property is used to filter notifications.

Returns:A list of affected container names.
Return type:list(str)
flatpaks

List of flatpaks affected by the action that generated this message.

Note

Sub-classes should override this method if the message pertains to one or more flatpaks. The data returned from this property is used to filter notifications.

Returns:A list of affected flatpaks names.
Return type:list(str)
modules

List of modules affected by the action that generated this message.

Note

Sub-classes should override this method if the message pertains to one or more modules. The data returned from this property is used to filter notifications.

Returns:A list of affected module names.
Return type:list(str)
packages

List of RPM packages affected by the action that generated this message.

Note

Sub-classes should override this method if the message pertains to one or more RPM packages. The data returned from this property is used to filter notifications.

Returns:A list of affected package names.
Return type:list(str)
summary

A short, human-readable representation of this message.

This should provide a short summary of the message, much like the subject line of an email.

Note

Sub-classes should override this method. It is used to create the subject of email notifications, IRC notification, and by other tools to display messages to humans in short form.

The default implementation is to simply return the message topic.

url

An URL to the action that caused this message to be emitted.

Note

Sub-classes should override this method if there is a URL associated with message.

Returns:A relevant URL.
Return type:str or None
usernames

List of users affected by the action that generated this message.

Note

Sub-classes should override this method if the message pertains to a user or users. The data returned from this property is used to filter notifications.

Returns:A list of affected usernames.
Return type:list(str)
validate()[source]

Validate the headers and body with the message schema, if any.

In addition to the user-provided schema, all messages are checked against the base schema which requires certain message headers and the that body be a JSON object.

Warning

This method should not be overridden by sub-classes.

Raises:
  • jsonschema.ValidationError – If either the message headers or the message body are invalid.
  • jsonschema.SchemaError – If either the message header schema or the message body schema are invalid.

Message Severity

Each message can have a severity associated with it. The severity is used by applications like the notification service to determine what messages to send to users. The severity can be set at the class level, or on a message-by-message basis. The following are valid severity levels:

DEBUG

fedora_messaging.message.DEBUG = 10

Indicates the message is for debugging or is otherwise very low priority. Users will not be notified unless they’ve explicitly requested DEBUG level messages.

INFO

fedora_messaging.message.INFO = 20

Indicates the message is informational. End users will not receive notifications for these messages by default. For example, automated tests passed for their package.

WARNING

fedora_messaging.message.WARNING = 30

Indicates a problem or an otherwise important problem. Users are notified of these messages when they pertain to packages they are associated with by default. For example, one or more automated tests failed against their package.

ERROR

fedora_messaging.message.ERROR = 40

Indicates a critically important message that users should act upon as soon as possible. For example, their package no longer builds.

SEVERITIES

fedora_messaging.message.SEVERITIES = (10, 20, 30, 40)

A tuple of all valid severity levels

dumps

fedora_messaging.message.dumps(messages)[source]

Serialize messages to a file format acceptable for loads() or for the publish CLI command. The format is a string where each line is a JSON object that conforms to the SERIALIZED_MESSAGE_SCHEMA format.

Parameters:messages (list or Message) – The messages to serialize. Each message in the messages is subclass of Message.
Returns:Serialized messages.
Return type:str
Raises:ValidationError – If one of the messages provided doesn’t conform to its schema.

loads

fedora_messaging.message.loads(serialized_messages)[source]

Deserialize messages from a file format produced by dumps(). The format is a string where each line is a JSON object that conforms to the SERIALIZED_MESSAGE_SCHEMA format.

Parameters:serialized_messages (str) – A string made up of a JSON object per line.
Returns:Deserialized message objects.
Return type:list
Raises:ValidationError – If the string isn’t formatted properly or message doesn’t pass the message schema validation

SERIALIZED_MESSAGE_SCHEMA

fedora_messaging.message.SERIALIZED_MESSAGE_SCHEMA = {'$schema': 'http://json-schema.org/draft-04/schema#', 'description': 'Schema for the JSON object used to represent messages in a file', 'properties': {'body': {'description': 'The message body.', 'type': 'object'}, 'headers': {'description': 'The message headers', 'properties': {'fedora_messaging_schema': {'type': 'string'}, 'fedora_messaging_severity': {'enum': [10, 20, 30, 40], 'type': 'number'}, 'sent-at': {'type': 'string'}}, 'type': 'object'}, 'id': {'description': "The message's UUID.", 'type': 'string'}, 'queue': {'description': 'The queue the message arrived on, if any.', 'type': 'string'}, 'topic': {'description': 'The message topic', 'type': 'string'}}, 'required': ['topic', 'headers', 'id', 'body', 'queue'], 'type': 'object'}

The schema for each JSON object produced by dumps(), consumed by loads(), and expected by CLI commands like “fedora-messaging publish”.

Utilities

The schema_utils module contains utilities that may be useful when writing the Python API of your message schemas.

libravatar_url

fedora_messaging.schema_utils.libravatar_url(email=None, openid=None, size=64, default='retro')[source]

Get the URL to an avatar from libravatar.

Either the user’s email or openid must be provided.

If you want to use Libravatar federation (through DNS), you should install and use the libravatar library instead. Check out the libravatar.libravatar_url() function.

Parameters:
  • email (str) – The user’s email
  • openid (str) – The user’s OpenID
  • size (int) – Size of the avatar in pixels (it’s a square).
  • default (str) – Default avatar to return if not found.
Returns:

The URL to the avatar image.

Return type:

str

Raises:

ValueError – If neither email nor openid are provided.

Exceptions

Exceptions raised by Fedora Messaging.

exception fedora_messaging.exceptions.BadDeclaration(obj_type=None, description=None, reason=None)[source]

Raised when declaring an object in AMQP fails.

Parameters:
  • obj_type (str) – The type of object being declared. One of “binding”, “queue”, or “exchange”.
  • description (dict) – The description of the object.
  • reason (str) – The reason the server gave for rejecting the declaration.
exception fedora_messaging.exceptions.BaseException[source]

The base class for all exceptions raised by fedora_messaging.

exception fedora_messaging.exceptions.ConfigurationException(message)[source]

Raised when there’s an invalid configuration setting

Parameters:message (str) – A detailed description of the configuration problem which is presented to the user.
exception fedora_messaging.exceptions.ConnectionException(*args, **kwargs)[source]

Raised if a general connection error occurred.

You may handle this exception by logging it and resending or discarding the message.

exception fedora_messaging.exceptions.ConsumeException[source]

Base class for exceptions related to consuming.

exception fedora_messaging.exceptions.ConsumerCanceled[source]

Raised when the server has canceled the consumer.

This can happen when the queue the consumer is subscribed to is deleted, or when the node the queue is located on fails.

exception fedora_messaging.exceptions.Drop[source]

Consumer callbacks should raise this to indicate they wish the message they are currently processing to be dropped.

exception fedora_messaging.exceptions.HaltConsumer(exit_code=0, reason=None, requeue=False, **kwargs)[source]

Consumer callbacks should raise this exception if they wish the consumer to be shut down.

Parameters:
  • exit_code (int) – The exit code to use when halting.
  • reason (str) – A reason for halting, presented to the user.
  • requeue (bool) – If true, the message is re-queued for later processing.
exception fedora_messaging.exceptions.Nack[source]

Consumer callbacks should raise this to indicate they wish the message they are currently processing to be re-queued.

exception fedora_messaging.exceptions.NoFreeChannels[source]

Raised when a connection has reached its channel limit

exception fedora_messaging.exceptions.PermissionException(obj_type=None, description=None, reason=None)[source]

Generic permissions exception.

Parameters:
  • obj_type (str) – The type of object being accessed that caused the permission error. May be None if the cause is unknown.
  • description (object) – The description of the object, if any. May be None.
  • reason (str) – The reason the server gave for the permission error, if any. If no reason is supplied by the server, this should be the best guess for what caused the error.
exception fedora_messaging.exceptions.PublishException(reason=None, **kwargs)[source]

Base class for exceptions related to publishing.

exception fedora_messaging.exceptions.PublishReturned(reason=None, **kwargs)[source]

Raised when the broker rejects and returns the message to the publisher.

You may handle this exception by logging it and resending or discarding the message.

exception fedora_messaging.exceptions.PublishTimeout(reason=None, **kwargs)[source]

Raised when the message could not be published in the given timeout.

This means the message was either never delivered to the broker, or that it was delivered, but never acknowledged by the broker.

exception fedora_messaging.exceptions.ValidationError[source]

This error is raised when a message fails validation with its JSON schema

This exception can be raised on an incoming or outgoing message. No need to catch this exception when publishing, it should warn you during development and testing that you’re trying to publish a message with a different format, and that you should either fix it or update the schema.

Configuration

conf

fedora_messaging.config.conf = {}

The configuration dictionary used by fedora-messaging and consumers.

DEFAULTS

fedora_messaging.config.DEFAULTS = {'amqp_url': 'amqp://?connection_attempts=3&retry_delay=5', 'bindings': [{'queue': '1c9083d4-33d0-4c7a-933a-0031ac8e2de3', 'exchange': 'amq.topic', 'routing_keys': ['#']}], 'callback': None, 'client_properties': {'app': 'Unknown', 'information': 'https://fedora-messaging.readthedocs.io/en/stable/', 'product': 'Fedora Messaging with Pika', 'version': 'fedora_messaging-1.7.0 with pika-1.1.0'}, 'consumer_config': {}, 'exchanges': {'amq.topic': {'arguments': {}, 'auto_delete': False, 'durable': True, 'type': 'topic'}}, 'log_config': {'disable_existing_loggers': False, 'formatters': {'simple': {'format': '[%(name)s %(levelname)s] %(message)s'}}, 'handlers': {'console': {'class': 'logging.StreamHandler', 'formatter': 'simple', 'stream': 'ext://sys.stdout'}}, 'loggers': {'fedora_messaging': {'handlers': ['console'], 'level': 'INFO', 'propagate': False}}, 'root': {'handlers': ['console'], 'level': 'WARNING'}, 'version': 1}, 'passive_declares': False, 'publish_exchange': 'amq.topic', 'qos': {'prefetch_count': 10, 'prefetch_size': 0}, 'queues': {'1c9083d4-33d0-4c7a-933a-0031ac8e2de3': {'arguments': {}, 'auto_delete': True, 'durable': False, 'exclusive': False}}, 'tls': {'ca_cert': None, 'certfile': None, 'keyfile': None}, 'topic_prefix': ''}

The default configuration settings for fedora-messaging. This should not be modified and should be copied with copy.deepcopy().

Twisted

In addition to the synchronous API, a Twisted API is provided for applications that need an asynchronous API. This API requires Twisted 16.1.0 or greater.

Note

This API is deprecated, please use fedora_messaging.api.twisted_consume

Protocol

The core Twisted interface, a protocol represent a specific connection to the AMQP broker.

The FedoraMessagingProtocolV2 has replaced the deprecated FedoraMessagingProtocolV2. This class inherits the pika.adapters.twisted_connection.TwistedProtocolConnection class and adds a few additional methods.

When combined with the fedora_messaging.twisted.factory.FedoraMessagingFactory class, it’s easy to create AMQP consumers that last across connections.

For an overview of Twisted clients, see the Twisted client documentation.

class fedora_messaging.twisted.protocol.FedoraMessagingProtocol(parameters, confirms=True)[source]

A Twisted Protocol for the Fedora Messaging system.

This protocol builds on the generic pika AMQP protocol to add calls specific to the Fedora Messaging implementation.

Warning

This class is deprecated, use the FedoraMessagingProtocolV2.

Parameters:
  • parameters (pika.ConnectionParameters) – The connection parameters.
  • confirms (bool) – If True, all outgoing messages will require a confirmation from the server, and the Deferred returned from the publish call will wait for that confirmation.
cancel(queue)[source]

Cancel the consumer for a queue.

Parameters:queue (str) – The name of the queue the consumer is subscribed to.
Returns:
A Deferred that fires when the consumer
is canceled, or None if the consumer was already canceled. Wrap the call in defer.maybeDeferred() to always receive a Deferred.
Return type:defer.Deferred
consume(callback, queue)[source]

Register a message consumer that executes the provided callback when messages are received.

The queue must exist prior to calling this method. If a consumer already exists for the given queue, the callback is simply updated and any new messages for that consumer use the new callback.

If resumeProducing() has not been called when this method is called, it will be called for you.

Parameters:
  • callback (callable) – The callback to invoke when a message is received.
  • queue (str) – The name of the queue to consume from.
Returns:

A namedtuple that identifies this consumer.

Return type:

fedora_messaging.twisted.protocol.Consumer

NoFreeChannels: If there are no available channels on this connection.
If this occurs, you can either reduce the number of consumers on this connection or create an additional connection.
pauseProducing()[source]

Pause the reception of messages by canceling all existing consumers. This does not disconnect from the server.

Message reception can be resumed with resumeProducing().

Returns:fired when the production is paused.
Return type:Deferred
resumeProducing()[source]

Starts or resumes the retrieval of messages from the server queue.

This method starts receiving messages from the server, they will be passed to the consumer callback.

Note

This is called automatically when consume() is called, so users should not need to call this unless pauseProducing() has been called.

Returns:fired when the production is ready to start
Return type:defer.Deferred
stopProducing()[source]

Stop producing messages and disconnect from the server. :returns: fired when the production is stopped. :rtype: Deferred

class fedora_messaging.twisted.protocol.Consumer(tag, queue, callback, channel)

A namedtuple that represents a AMQP consumer.

This is deprecated. Use fedora_messaging.twisted.consumer.Consumer.

  • The tag field is the consumer’s AMQP tag (str).
  • The queue field is the name of the queue it’s consuming from (str).
  • The callback field is the function called for each message (a callable).
  • The channel is the AMQP channel used for the consumer (pika.adapters.twisted_connection.TwistedChannel).
callback

Alias for field number 2

channel

Alias for field number 3

queue

Alias for field number 1

tag

Alias for field number 0

Factory

A Twisted Factory for creating and configuring instances of the FedoraMessagingProtocol.

A factory is used to implement automatic re-connections by producing protocol instances (connections) on demand. Twisted uses factories for its services APIs.

See the Twisted client documentation for more information.

class fedora_messaging.twisted.factory.FedoraMessagingFactory(parameters, confirms=True, exchanges=None, queues=None, bindings=None)[source]

Reconnecting factory for the Fedora Messaging protocol.

buildProtocol(addr)[source]

Create the Protocol instance.

See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.

cancel(queue)[source]

Cancel the consumer for a queue.

This removes the consumer from the list of consumers to be configured for every connection.

Parameters:queue (str) – The name of the queue the consumer is subscribed to.
Returns:
Either a Deferred that fires when the consumer
is canceled, or None if the consumer was already canceled. Wrap the call in defer.maybeDeferred() to always receive a Deferred.
Return type:defer.Deferred or None
clientConnectionFailed(connector, reason)[source]

Called when the client has failed to connect to the broker.

See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.

clientConnectionLost(connector, reason)[source]

Called when the connection to the broker has been lost.

See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.

consume(callback, queue)[source]

Register a new consumer.

This consumer will be configured for every protocol this factory produces so it will be reconfigured on network failures. If a connection is already active, the consumer will be added to it.

Parameters:
  • callback (callable) – The callback to invoke when a message arrives.
  • queue (str) – The name of the queue to consume from.
protocol

alias of fedora_messaging.twisted.protocol.FedoraMessagingProtocol

publish(message, exchange=None)[source]

Publish a fedora_messaging.message.Message to an exchange on the message broker. This call will survive connection failures and try until it succeeds or is canceled.

Parameters:
Returns:

A deferred that fires when the message is published.

Return type:

defer.Deferred

Raises:
  • PublishReturned – If the published message is rejected by the broker.
  • ConnectionException – If a connection error occurs while publishing. Calling this method again will wait for the next connection and publish when it is available.
startedConnecting(connector)[source]

Called when the connection to the broker has started.

See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.

stopFactory()[source]

Stop the factory.

See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.

stopTrying()[source]

Stop trying to reconnect to the broker.

See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.

whenConnected()[source]

Get the next connected protocol instance.

Returns:
A deferred that results in a connected
FedoraMessagingProtocol.
Return type:defer.Deferred
class fedora_messaging.twisted.factory.FedoraMessagingFactoryV2(parameters, confirms=True)[source]

Reconnecting factory for the Fedora Messaging protocol.

buildProtocol(addr)[source]

Create the Protocol instance.

See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.

cancel(consumers)[source]

Cancel a consumer that was previously started with consume.

Parameters:consumer (list of fedora_messaging.api.Consumer) – The consumers to cancel.
consume(callback, bindings, queues)[source]

Start a consumer that lasts across individual connections.

Parameters:
  • callback (callable) – A callable object that accepts one positional argument, a Message or a class object that implements the __call__ method. The class will be instantiated before use.
  • bindings (dict or list of dict) – Bindings to declare before consuming. This should be the same format as the bindings configuration.
  • queues (dict) – The queues to declare and consume from. Each key in this dictionary is a queue, and each value is its settings as a dictionary. These settings dictionaries should have the “durable”, “auto_delete”, “exclusive”, and “arguments” keys. Refer to queues for details on their meanings.
Returns:

A deferred that fires with the list of one or more fedora_messaging.twisted.consumer.Consumer objects. These can be passed to the FedoraMessagingFactoryV2.cancel() API to halt them. Each consumer object has a result instance variable that is a Deferred that fires or errors when the consumer halts. The Deferred may error back with a BadDeclaration if the user does not have permissions to consume from the queue.

Return type:

defer.Deferred

publish(message, exchange)[source]

Publish a fedora_messaging.message.Message to an exchange on the message broker. This call will survive connection failures and try until it succeeds or is canceled.

Parameters:
  • message (message.Message) – The message to publish.
  • exchange (str) – The name of the AMQP exchange to publish to.
Returns:

A deferred that fires when the message is published.

Return type:

defer.Deferred

Raises:
  • NoFreeChannels – If there are no available channels on this connection. If this occurs, you can either reduce the number of consumers on this connection or create an additional connection.
  • PublishReturned – If the published message is rejected by the broker.
  • ConnectionException – If a connection error occurs while publishing. Calling this method again will wait for the next connection and publish when it is available.
stopFactory()[source]

Stop the factory.

See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.

when_connected()[source]

Retrieve the currently-connected Protocol, or the next one to connect.

Returns:
A Deferred that fires with a connected
FedoraMessagingProtocolV2 instance. This is similar to the whenConnected method from the Twisted endpoints APIs, which is sadly isn’t available before 16.1.0, which isn’t available in EL7.
Return type:defer.Deferred

Service

Twisted Service to start and stop the Fedora Messaging Twisted Factory.

This Service makes it easier to build a Twisted application that embeds a Fedora Messaging component. See the verify_missing service in fedmsg-migration-tools for a use case.

See https://twistedmatrix.com/documents/current/core/howto/application.html

class fedora_messaging.twisted.service.FedoraMessagingService(amqp_url=None, exchanges=None, queues=None, bindings=None, consumers=None)[source]

A Twisted service to connect to the Fedora Messaging broker.

Parameters:
  • on_message (callable|None) – Callback that will be passed each incoming messages. If None, no message consuming is setup.
  • amqp_url (str) – URL to use for the AMQP server.
  • exchanges (list of dicts) – List of exchanges to declare at the start of every connection. Each dictionary is passed to pika.channel.Channel.exchange_declare() as keyword arguments, so any parameter to that method is a valid key.
  • queues (list of dicts) – List of queues to declare at the start of every connection. Each dictionary is passed to pika.channel.Channel.queue_declare() as keyword arguments, so any parameter to that method is a valid key.
  • bindings (list of dicts) – A list of bindings to be created between queues and exchanges. Each dictionary is passed to pika.channel.Channel.queue_bind(). The “queue” and “exchange” keys are required.
  • consumers (dict) – A dictionary where each key is a queue name and the value is a callable object to handle messages on that queue. Consumers will be set up after each connection is established so they will survive networking issues.
factoryClass

alias of fedora_messaging.twisted.factory.FedoraMessagingFactory

class fedora_messaging.twisted.service.FedoraMessagingServiceV2(amqp_url=None, publish_confirms=True)[source]

A Twisted service to connect to the Fedora Messaging broker.

Parameters:
  • amqp_url (str) – URL to use for the AMQP server.
  • publish_confirms (bool) – If true, use the RabbitMQ publisher confirms AMQP extension.
stopService()[source]

Gracefully stop the service.

Returns:
a Deferred which is triggered when the service has
finished shutting down.
Return type:defer.Deferred