Developer Interface¶
This documentation covers the public interfaces fedora_messaging provides.
Note
Documented interfaces follow Semantic Versioning 2.0.0. Any interface not documented here may change at any time without warning.
API Table of Contents
Publishing¶
publish¶
-
fedora_messaging.api.
publish
(message, exchange=None, timeout=30)[source]¶ Publish a message to an exchange.
This is a synchronous call, meaning that when this function returns, an acknowledgment has been received from the message broker and you can be certain the message was published successfully.
There are some cases where an error occurs despite your message being successfully published. For example, if a network partition occurs after the message is received by the broker. Therefore, you may publish duplicate messages. For complete details, see the Publishing documentation.
>>> from fedora_messaging import api >>> message = api.Message(body={'Hello': 'world'}, topic='Hi') >>> api.publish(message)
If an attempt to publish fails because the broker rejects the message, it is not retried. Connection attempts to the broker can be configured using the “connection_attempts” and “retry_delay” options in the broker URL. See
pika.connection.URLParameters
for details.Parameters: - message (message.Message) – The message to publish.
- exchange (str) – The name of the AMQP exchange to publish to; defaults to publish_exchange
- timeout (int) – The maximum time in seconds to wait before giving up attempting to publish the message. If the timeout is reached, a PublishTimeout exception is raised.
Raises: fedora_messaging.exceptions.PublishReturned
– Raised if the broker rejects the message.fedora_messaging.exceptions.PublishTimeout
– Raised if the broker could not be contacted in the given timeout time.fedora_messaging.exceptions.PublishForbidden
– Raised if the broker rejects the message because of permission issues.fedora_messaging.exceptions.ValidationError
– Raised if the message fails validation with its JSON schema. This only depends on the message you are trying to send, the AMQP server is not involved.
Subscribing¶
twisted_consume¶
-
fedora_messaging.api.
twisted_consume
(callback, bindings=None, queues=None)[source]¶ Start a consumer using the provided callback and run it using the Twisted event loop (reactor).
Note
Callbacks run in a Twisted-managed thread pool using the
twisted.internet.threads.deferToThread()
API to avoid them blocking the event loop. If you wish to use Twisted APIs in your callback you must use thetwisted.internet.threads.blockingCallFromThread()
ortwisted.internet.interfaces.IReactorFromThreads
APIs.This API expects the caller to start the reactor.
Parameters: - callback (callable) – A callable object that accepts one positional argument,
a
Message
or a class object that implements the__call__
method. The class will be instantiated before use. - bindings (dict or list of dict) – Bindings to declare before consuming. This should be the same format as the bindings configuration.
- queues (dict) – The queue to declare and consume from. Each key in this dictionary should be a queue name to declare, and each value should be a dictionary with the “durable”, “auto_delete”, “exclusive”, and “arguments” keys.
Raises: ValueError
– If the callback, bindings, or queues are invalid.Returns: A deferred that fires with the list of one or more
Consumer
objects. Each consumer object has aConsumer.result
instance variable that is a Deferred that fires or errors when the consumer halts. Note that this API is meant to survive network problems, so consuming will continue untilConsumer.cancel()
is called or a fatal server error occurs. The deferred returned by this function may error back with afedora_messaging.exceptions.BadDeclaration
if queues or bindings cannot be declared on the broker, afedora_messaging.exceptions.PermissionException
if the user doesn’t have access to the queue, orfedora_messaging.exceptions.ConnectionException
if the TLS or AMQP handshake fails.Return type: - callback (callable) – A callable object that accepts one positional argument,
a
Consumer¶
-
class
fedora_messaging.api.
Consumer
(queue=None, callback=None)[source]¶ Represents a Twisted AMQP consumer and is returned from the call to
fedora_messaging.api.twisted_consume()
.-
callback
¶ The callback to run when a message arrives.
Type: callable
-
result
¶ A deferred that runs the callbacks if the consumer exits gracefully after being canceled by a call to
Consumer.cancel()
and errbacks if the consumer stops for any other reason. The reasons a consumer could stop are: afedora_messaging.exceptions.PermissionExecption
if the consumer does not have permissions to read from the queue it is subscribed to, aHaltConsumer
is raised by the consumer indicating it wishes to halt, an unexpectedException
is raised by the consumer, or if the consumer is canceled by the server which happens if the queue is deleted by an administrator or if the node the queue lives on fails.Type: twisted.internet.defer.Deferred
-
cancel
()[source]¶ Cancel the consumer and clean up resources associated with it. Consumers that are canceled are allowed to finish processing any messages before halting.
Returns: A deferred that fires when the consumer has finished processing any message it was in the middle of and has been successfully canceled. Return type: defer.Deferred
-
consume¶
-
fedora_messaging.api.
consume
(callback, bindings=None, queues=None)[source]¶ Start a message consumer that executes the provided callback when messages are received.
This API is blocking and will not return until the process receives a signal from the operating system.
Warning
This API is runs the callback in the IO loop thread. This means if your callback could run for a length of time near the heartbeat interval, which is likely on the order of 60 seconds, the broker will kill the TCP connection and the message will be re-delivered on start-up.
For now, use the
twisted_consume()
API which runs the callback in a thread and continues to handle AMQP events while the callback runs if you have a long-running callback.The callback receives a single positional argument, the message:
>>> from fedora_messaging import api >>> def my_callback(message): ... print(message) >>> bindings = [{'exchange': 'amq.topic', 'queue': 'demo', 'routing_keys': ['#']}] >>> queues = { ... "demo": {"durable": False, "auto_delete": True, "exclusive": True, "arguments": {}} ... } >>> api.consume(my_callback, bindings=bindings, queues=queues)
If the bindings and queue arguments are not provided, they will be loaded from the configuration.
For complete documentation on writing consumers, see the Consumers documentation.
Parameters: - callback (callable) – A callable object that accepts one positional argument,
a
Message
or a class object that implements the__call__
method. The class will be instantiated before use. - bindings (dict or list of dict) – Bindings to declare before consuming. This should be the same format as the bindings configuration.
- queues (dict) – The queue or queues to declare and consume from. This should be in the same format as the queues configuration dictionary where each key is a queue name and each value is a dictionary of settings for that queue.
Raises: fedora_messaging.exceptions.HaltConsumer
– If the consumer requests that it be stopped.ValueError
– If the consumer provides a callback that is not a class that implements __call__ and is not a function, if the bindings argument is not a dict or list of dicts with the proper keys, or if the queues argument isn’t a dict with the proper keys.
- callback (callable) – A callable object that accepts one positional argument,
a
Signals¶
Signals sent by fedora_messaging APIs using blinker.base.Signal
signals.
pre_publish_signal¶
-
fedora_messaging.api.
pre_publish_signal
= <blinker.base.NamedSignal object at 0x7f0b38f3c510; 'pre_publish'>¶ A signal triggered before the message is published. The signal handler should accept a single keyword argument,
message
, which is the instance of thefedora_messaging.message.Message
being sent. It is acceptable to mutate the message, but thevalidate
method will be called on it after this signal.
publish_signal¶
-
fedora_messaging.api.
publish_signal
= <blinker.base.NamedSignal object at 0x7f0b3cf14110; 'publish_success'>¶ A signal triggered after a message is published successfully. The signal handler should accept a single keyword argument,
message
, which is the instance of thefedora_messaging.message.Message
that was sent.
publish_failed_signal¶
-
fedora_messaging.api.
publish_failed_signal
= <blinker.base.NamedSignal object at 0x7f0b38e35910; 'publish_failed_signal'>¶ A signal triggered after a message fails to publish for some reason. The signal handler should accept two keyword argument,
message
, which is the instance of thefedora_messaging.message.Message
that failed to be sent, anderror
, the exception that was raised.
Message Schemas¶
This module defines the base class of message objects and keeps a registry of known message implementations. This registry is populated from Python entry points in the “fedora.messages” group.
To implement your own message schema, simply create a class that inherits the
Message
class, and add an entry point in your Python package under the
“fedora.messages” group. For example, an entry point for the Message
schema would be:
entry_points = {
'fedora.messages': [
'base.message=fedora_messaging.message:Message'
]
}
The entry point name must be unique to your application and is used to map
messages to your message class, so it’s best to prefix it with your application
name (e.g. bodhi.new_update_messageV1
). When publishing, the Fedora
Messaging library will add a header with the entry point name of the class used
so the consumer can locate the correct schema.
Since every client needs to have the message schema installed, you should define this class in a small Python package of its own.
Message¶
-
class
fedora_messaging.message.
Message
(body=None, headers=None, topic=None, properties=None, severity=None)[source]¶ Messages are simply JSON-encoded objects. This allows message authors to define a schema and implement Python methods to abstract the raw message from the user. This allows the schema to change and evolve without breaking the user-facing API.
There are a number of properties that are intended to be overridden by users. These fields are used to sort messages for notifications or are used to create human-readable versions of the messages. Properties that are intended for this purpose are noted in their attribute documentation below.
Parameters: - headers (dict) – A set of message headers. Consult the headers schema for expected keys and values.
- body (dict) – The message body. Consult the body schema for expected keys and values. This dictionary must be JSON-serializable by the default serializer.
- topic (str) – The message topic as a unicode string. If this is not provided, the default topic for the class is used. See the attribute documentation below for details.
- properties (pika.BasicProperties) – The AMQP properties. If this is not provided, they will be generated. Most users should not need to provide this, but it can be useful in testing scenarios.
- severity (int) – An integer that indicates the severity of the message. This is
used to determine what messages to notify end users about and should be
DEBUG
,INFO
,WARNING
, orERROR
. The default isINFO
, and can be set as a class attribute or on an instance-by-instance basis.
-
id
¶ The message id as a unicode string. This attribute is automatically generated and set by the library and users should only set it themselves in testing scenarios.
Type: str
-
topic
¶ The message topic as a unicode string. The topic is used by message consumers to filter what messages they receive. Topics should be a string of words separated by ‘.’ characters, with a length limit of 255 bytes. Because of this byte limit, it is best to avoid non-ASCII character. Topics should start general and get more specific each word. For example: “bodhi.update.kernel” is a possible topic. “bodhi” identifies the application, “update” identifies the message, and “kernel” identifies the package in the update. This can be set at a class level or on a instance level. Dynamic, specific topics that allow for fine-grain filtering are preferred.
Type: str
-
headers_schema
¶ A JSON schema to be used with
jsonschema.validate()
to validate the message headers. For most users, the default definition should suffice.Type: dict
-
body_schema
¶ A JSON schema to be used with
jsonschema.validate()
to validate the message body. The body_schema is retrieved on a message instance so it is not required to be a class attribute, although this is a convenient approach. Users are also free to write the JSON schema as a file and load the file from the filesystem or network if they prefer.Type: dict
-
body
¶ The message body as a Python dictionary. This is validated by the body schema before publishing and before consuming.
Type: dict
-
severity
¶ An integer that indicates the severity of the message. This is used to determine what messages to notify end users about and should be
DEBUG
,INFO
,WARNING
, orERROR
. The default isINFO
, and can be set as a class attribute or on an instance-by-instance basis.Type: int
-
queue
¶ The name of the queue this message arrived through. This attribute is set automatically by the library and users should never set it themselves.
Type: str
-
deprecated
¶ Whether this message schema has been deprecated by a more recent version. Emits a warning when a message of this class is received, to let consumers know that they should plan to upgrade. Defaults to
False
.Type: bool
-
priority
¶ The priority for the message, if the destination queue supports it. Defaults to zero (lowest priority).
This value is taken into account in queues that have the
x-max-priority
argument set. Most queues in Fedora don’t support priorities, in which case the value will be ignored.Larger numbers indicate higher priority, you can read more about it in RabbitMQ’s documentation on priority.
Type: int
-
__str__
()[source]¶ A human-readable representation of this message.
This should provide a detailed, long-form representation of the message. The default implementation is to format the raw message id, topic, headers, and body.
Note
Sub-classes should override this method. It is used to create the body of email notifications and by other tools to display messages to humans.
-
agent_avatar
¶ An URL to the avatar of the user who caused the action.
Note
Sub-classes should override this method if the default Libravatar and OpenID-based URL generator is not appropriate.
Returns: The URL to the user’s avatar. Return type: str or None
-
agent_name
¶ The username of the user who caused the action.
Note
Sub-classes should override this method if the message was triggered by a particular user.
Returns: The agent’s username. Return type: str or None
-
app_icon
¶ An URL to the icon of the application that generated the message.
Note
Sub-classes should override this method if their application has an icon and they wish that image to appear in applications that consume messages.
Returns: The URL to the app’s icon. Return type: str or None
-
app_name
¶ The name of the application that generated the message.
Note
Sub-classes should override this method.
Returns: The name of the application. Return type: str or None
-
containers
¶ List of containers affected by the action that generated this message.
Note
Sub-classes should override this method if the message pertains to one or more container images. The data returned from this property is used to filter notifications.
Returns: A list of affected container names. Return type: list(str)
-
flatpaks
¶ List of flatpaks affected by the action that generated this message.
Note
Sub-classes should override this method if the message pertains to one or more flatpaks. The data returned from this property is used to filter notifications.
Returns: A list of affected flatpaks names. Return type: list(str)
-
groups
¶ List of groups affected by the action that generated this message.
Note
Sub-classes should override this method if the message pertains to a group or groups. The data returned from this property is used to filter notifications.
Returns: A list of affected groups. Return type: list(str)
-
modules
¶ List of modules affected by the action that generated this message.
Note
Sub-classes should override this method if the message pertains to one or more modules. The data returned from this property is used to filter notifications.
Returns: A list of affected module names. Return type: list(str)
-
packages
¶ List of RPM packages affected by the action that generated this message.
Note
Sub-classes should override this method if the message pertains to one or more RPM packages. The data returned from this property is used to filter notifications.
Returns: A list of affected package names. Return type: list(str)
-
summary
¶ A short, human-readable representation of this message.
This should provide a short summary of the message, much like the subject line of an email.
Note
Sub-classes should override this method. It is used to create the subject of email notifications, IRC notification, and by other tools to display messages to humans in short form.
The default implementation is to simply return the message topic.
-
url
¶ An URL to the action that caused this message to be emitted.
Note
Sub-classes should override this method if there is a URL associated with message.
Returns: A relevant URL. Return type: str or None
-
usernames
¶ List of users affected by the action that generated this message.
Note
Sub-classes should override this method if the message pertains to a user or users. The data returned from this property is used to filter notifications.
Returns: A list of affected usernames. Return type: list(str)
-
validate
()[source]¶ Validate the headers and body with the message schema, if any.
In addition to the user-provided schema, all messages are checked against the base schema which requires certain message headers and the that body be a JSON object.
Warning
This method should not be overridden by sub-classes.
Raises: jsonschema.ValidationError
– If either the message headers or the message body are invalid.jsonschema.SchemaError
– If either the message header schema or the message body schema are invalid.
Message Severity¶
Each message can have a severity associated with it. The severity is used by applications like the notification service to determine what messages to send to users. The severity can be set at the class level, or on a message-by-message basis. The following are valid severity levels:
DEBUG¶
-
fedora_messaging.message.
DEBUG
= 10¶ Indicates the message is for debugging or is otherwise very low priority. Users will not be notified unless they’ve explicitly requested DEBUG level messages.
INFO¶
-
fedora_messaging.message.
INFO
= 20¶ Indicates the message is informational. End users will not receive notifications for these messages by default. For example, automated tests passed for their package.
WARNING¶
-
fedora_messaging.message.
WARNING
= 30¶ Indicates a problem or an otherwise important problem. Users are notified of these messages when they pertain to packages they are associated with by default. For example, one or more automated tests failed against their package.
ERROR¶
-
fedora_messaging.message.
ERROR
= 40¶ Indicates a critically important message that users should act upon as soon as possible. For example, their package no longer builds.
SEVERITIES¶
-
fedora_messaging.message.
SEVERITIES
= (10, 20, 30, 40)¶ A tuple of all valid severity levels
dumps¶
-
fedora_messaging.message.
dumps
(messages)[source]¶ Serialize messages to a file format acceptable for
loads()
or for the publish CLI command. The format is a string where each line is a JSON object that conforms to theSERIALIZED_MESSAGE_SCHEMA
format.Parameters: messages (list or Message) – The messages to serialize. Each message in the messages is subclass of Message. Returns: Serialized messages. Return type: str Raises: ValidationError
– If one of the messages provided doesn’t conform to its schema.
loads¶
-
fedora_messaging.message.
loads
(serialized_messages)[source]¶ Deserialize messages from a file format produced by
dumps()
. The format is a string where each line is a JSON object that conforms to theSERIALIZED_MESSAGE_SCHEMA
format.Parameters: serialized_messages (str) – A string made up of a JSON object per line. Returns: Deserialized message objects. Return type: list Raises: ValidationError
– If the string isn’t formatted properly or message doesn’t pass the message schema validation
SERIALIZED_MESSAGE_SCHEMA¶
-
fedora_messaging.message.
SERIALIZED_MESSAGE_SCHEMA
= {'$schema': 'http://json-schema.org/draft-04/schema#', 'description': 'Schema for the JSON object used to represent messages in a file', 'properties': {'body': {'description': 'The message body.', 'type': 'object'}, 'headers': {'description': 'The message headers', 'properties': {'fedora_messaging_schema': {'type': 'string'}, 'fedora_messaging_severity': {'enum': [10, 20, 30, 40], 'type': 'number'}, 'sent-at': {'type': 'string'}}, 'type': 'object'}, 'id': {'description': "The message's UUID.", 'type': 'string'}, 'priority': {'description': 'The priority that the message has been sent with.', 'type': ['integer', 'null']}, 'queue': {'description': 'The queue the message arrived on, if any.', 'type': ['string', 'null']}, 'topic': {'description': 'The message topic', 'type': 'string'}}, 'required': ['topic', 'id', 'body'], 'type': 'object'}¶ The schema for each JSON object produced by
dumps()
, consumed byloads()
, and expected by CLI commands like “fedora-messaging publish”.
Utilities¶
The schema_utils
module contains utilities that may be useful when writing
the Python API of your message schemas.
libravatar_url¶
-
fedora_messaging.schema_utils.
libravatar_url
(email=None, openid=None, size=64, default='retro')[source]¶ Get the URL to an avatar from libravatar.
Either the user’s email or openid must be provided.
If you want to use Libravatar federation (through DNS), you should install and use the
libravatar
library instead. Check out thelibravatar.libravatar_url()
function.Parameters: Returns: The URL to the avatar image.
Return type: Raises: ValueError
– If neither email nor openid are provided.
Exceptions¶
Exceptions raised by Fedora Messaging.
-
exception
fedora_messaging.exceptions.
BadDeclaration
(obj_type=None, description=None, reason=None)[source]¶ Raised when declaring an object in AMQP fails.
Parameters:
-
exception
fedora_messaging.exceptions.
BaseException
[source]¶ The base class for all exceptions raised by fedora_messaging.
-
exception
fedora_messaging.exceptions.
ConfigurationException
(message)[source]¶ Raised when there’s an invalid configuration setting
Parameters: message (str) – A detailed description of the configuration problem which is presented to the user.
-
exception
fedora_messaging.exceptions.
ConnectionException
(*args, **kwargs)[source]¶ Raised if a general connection error occurred.
You may handle this exception by logging it and resending or discarding the message.
-
exception
fedora_messaging.exceptions.
ConsumeException
[source]¶ Base class for exceptions related to consuming.
-
exception
fedora_messaging.exceptions.
ConsumerCanceled
[source]¶ Raised when the server has canceled the consumer.
This can happen when the queue the consumer is subscribed to is deleted, or when the node the queue is located on fails.
-
exception
fedora_messaging.exceptions.
Drop
[source]¶ Consumer callbacks should raise this to indicate they wish the message they are currently processing to be dropped.
-
exception
fedora_messaging.exceptions.
HaltConsumer
(exit_code=0, reason=None, requeue=False, **kwargs)[source]¶ Consumer callbacks should raise this exception if they wish the consumer to be shut down.
Parameters:
-
exception
fedora_messaging.exceptions.
Nack
[source]¶ Consumer callbacks should raise this to indicate they wish the message they are currently processing to be re-queued.
-
exception
fedora_messaging.exceptions.
NoFreeChannels
[source]¶ Raised when a connection has reached its channel limit
-
exception
fedora_messaging.exceptions.
PermissionException
(obj_type=None, description=None, reason=None)[source]¶ Generic permissions exception.
Parameters: - obj_type (str) – The type of object being accessed that caused the permission error. May be None if the cause is unknown.
- description (object) – The description of the object, if any. May be None.
- reason (str) – The reason the server gave for the permission error, if any. If no reason is supplied by the server, this should be the best guess for what caused the error.
-
exception
fedora_messaging.exceptions.
PublishException
(reason=None, **kwargs)[source]¶ Base class for exceptions related to publishing.
-
exception
fedora_messaging.exceptions.
PublishForbidden
(reason=None, **kwargs)[source]¶ Raised when the broker rejects the message due to permission errors.
You may handle this exception by logging it and discarding the message, as it is likely a permanent error.
-
exception
fedora_messaging.exceptions.
PublishReturned
(reason=None, **kwargs)[source]¶ Raised when the broker rejects and returns the message to the publisher.
You may handle this exception by logging it and resending or discarding the message.
-
exception
fedora_messaging.exceptions.
PublishTimeout
(reason=None, **kwargs)[source]¶ Raised when the message could not be published in the given timeout.
This means the message was either never delivered to the broker, or that it was delivered, but never acknowledged by the broker.
-
exception
fedora_messaging.exceptions.
ValidationError
[source]¶ This error is raised when a message fails validation with its JSON schema
This exception can be raised on an incoming or outgoing message. No need to catch this exception when publishing, it should warn you during development and testing that you’re trying to publish a message with a different format, and that you should either fix it or update the schema.
Configuration¶
conf¶
-
fedora_messaging.config.
conf
= {}¶ The configuration dictionary used by fedora-messaging and consumers.
DEFAULTS¶
-
fedora_messaging.config.
DEFAULTS
= {'amqp_url': 'amqp://?connection_attempts=3&retry_delay=5', 'bindings': [{'queue': '', 'exchange': 'amq.topic', 'routing_keys': ['#']}], 'callback': None, 'client_properties': {'app': 'Unknown', 'information': 'https://fedora-messaging.readthedocs.io/en/stable/', 'product': 'Fedora Messaging with Pika', 'version': 'fedora_messaging-3.4.0 with pika-1.3.2'}, 'consumer_config': {}, 'exchanges': {'amq.topic': {'arguments': {}, 'auto_delete': False, 'durable': True, 'type': 'topic'}}, 'log_config': {'disable_existing_loggers': False, 'formatters': {'simple': {'format': '[%(name)s %(levelname)s] %(message)s'}}, 'handlers': {'console': {'class': 'logging.StreamHandler', 'formatter': 'simple', 'stream': 'ext://sys.stdout'}}, 'loggers': {'fedora_messaging': {'handlers': ['console'], 'level': 'INFO', 'propagate': False}}, 'root': {'handlers': ['console'], 'level': 'WARNING'}, 'version': 1}, 'passive_declares': False, 'publish_exchange': 'amq.topic', 'publish_priority': None, 'qos': {'prefetch_count': 10, 'prefetch_size': 0}, 'queues': {'': {'arguments': {}, 'auto_delete': True, 'durable': False, 'exclusive': True}}, 'tls': {'ca_cert': None, 'certfile': None, 'keyfile': None}, 'topic_prefix': ''}¶ The default configuration settings for fedora-messaging. This should not be modified and should be copied with
copy.deepcopy()
.