Developer Interface¶
This documentation covers the public interfaces fedora_messaging provides.
Note
Documented interfaces follow Semantic Versioning 2.0.0. Any interface not documented here may change at any time without warning.
API Table of Contents
Publishing¶
publish¶
-
fedora_messaging.api.
publish
(message, exchange=None)[source]¶ Publish a message to an exchange.
This is a synchronous call, meaning that when this function returns, an acknowledgment has been received from the message broker and you can be certain the message was published successfully.
There are some cases where an error occurs despite your message being successfully published. For example, if a network partition occurs after the message is received by the broker. Therefore, you may publish duplicate messages. For complete details, see the Publishing documentation.
>>> from fedora_messaging import api >>> message = api.Message(body={'Hello': 'world'}, topic='Hi') >>> api.publish(message)
If an attempt to publish fails because the broker rejects the message, it is not retried. Connection attempts to the broker can be configured using the “connection_attempts” and “retry_delay” options in the broker URL. See
pika.connection.URLParameters
for details.Parameters: - message (message.Message) – The message to publish.
- exchange (str) – The name of the AMQP exchange to publish to; defaults to publish_exchange
Raises: fedora_messaging.exceptions.PublishReturned
– Raised if the broker rejects the message.fedora_messaging.exceptions.ConnectionException
– Raised if a connection error occurred before the publish confirmation arrived.fedora_messaging.exceptions.ValidationError
– Raised if the message fails validation with its JSON schema. This only depends on the message you are trying to send, the AMQP server is not involved.
Subscribing¶
twisted_consume¶
-
fedora_messaging.api.
twisted_consume
(callback, bindings=None, queues=None)[source]¶ Start a consumer using the provided callback and run it using the Twisted event loop (reactor).
Note
Callbacks run in a Twisted-managed thread pool using the
twisted.internet.threads.deferToThread()
API to avoid them blocking the event loop. If you wish to use Twisted APIs in your callback you must use thetwisted.internet.threads.blockingCallFromThread()
ortwisted.internet.interfaces.IReactorFromThreads
APIs.This API expects the caller to start the reactor.
Parameters: - callback (callable) – A callable object that accepts one positional argument,
a
Message
or a class object that implements the__call__
method. The class will be instantiated before use. - bindings (dict or list of dict) – Bindings to declare before consuming. This should be the same format as the bindings configuration.
- queues (dict) – The queue to declare and consume from. Each key in this dictionary should be a queue name to declare, and each value should be a dictionary with the “durable”, “auto_delete”, “exclusive”, and “arguments” keys.
Returns: A deferred that fires with the list of one or more
Consumer
objects. Each consumer object has aConsumer.result
instance variable that is a Deferred that fires or errors when the consumer halts. Note that this API is meant to survive network problems, so consuming will continue untilConsumer.cancel()
is called or a fatal server error occurs. The deferred returned by this function may error back with afedora_messaging.exceptions.BadDeclaration
if queues or bindings cannot be declared on the broker, afedora_messaging.exceptions.PermissionException
if the user doesn’t have access to the queue, orfedora_messaging.exceptions.ConnectionException
if the TLS or AMQP handshake fails.Return type: - callback (callable) – A callable object that accepts one positional argument,
a
Consumer¶
-
class
fedora_messaging.api.
Consumer
(queue=None, callback=None)[source]¶ Represents a Twisted AMQP consumer and is returned from the call to
fedora_messaging.api.twisted_consume()
.-
callback
¶ The callback to run when a message arrives.
Type: callable
-
result
¶ A deferred that runs the callbacks if the consumer exits gracefully after being canceled by a call to
Consumer.cancel()
and errbacks if the consumer stops for any other reason. The reasons a consumer could stop are: afedora_messaging.exceptions.PermissionExecption
if the consumer does not have permissions to read from the queue it is subscribed to, aHaltConsumer
is raised by the consumer indicating it wishes to halt, an unexpectedException
is raised by the consumer, or if the consumer is canceled by the server which happens if the queue is deleted by an administrator or if the node the queue lives on fails.Type: twisted.internet.defer.Deferred
-
cancel
()[source]¶ Cancel the consumer and clean up resources associated with it. Consumers that are canceled are allowed to finish processing any messages before halting.
Returns: A deferred that fires when the consumer has finished processing any message it was in the middle of and has been successfully canceled. Return type: defer.Deferred
-
consume¶
-
fedora_messaging.api.
consume
(callback, bindings=None, queues=None)[source]¶ Start a message consumer that executes the provided callback when messages are received.
This API is blocking and will not return until the process receives a signal from the operating system.
Warning
This API is runs the callback in the IO loop thread. This means if your callback could run for a length of time near the heartbeat interval, which is likely on the order of 60 seconds, the broker will kill the TCP connection and the message will be re-delivered on start-up.
For now, use the
twisted_consume()
API which runs the callback in a thread and continues to handle AMQP events while the callback runs if you have a long-running callback.The callback receives a single positional argument, the message:
>>> from fedora_messaging import api >>> def my_callback(message): ... print(message) >>> bindings = [{'exchange': 'amq.topic', 'queue': 'demo', 'routing_keys': ['#']}] >>> queues = { ... "demo": {"durable": False, "auto_delete": True, "exclusive": True, "arguments": {}} ... } >>> api.consume(my_callback, bindings=bindings, queues=queues)
If the bindings and queue arguments are not provided, they will be loaded from the configuration.
For complete documentation on writing consumers, see the Consumers documentation.
Parameters: - callback (callable) – A callable object that accepts one positional argument,
a
Message
or a class object that implements the__call__
method. The class will be instantiated before use. - bindings (dict or list of dict) – Bindings to declare before consuming. This should be the same format as the bindings configuration.
- queues (dict) – The queue or queues to declare and consume from. This should be in the same format as the queues configuration dictionary where each key is a queue name and each value is a dictionary of settings for that queue.
Raises: fedora_messaging.exceptions.HaltConsumer
– If the consumer requests that it be stopped.ValueError
– If the consumer provide callback that is not a class that implements __call__ and is not a function, if the bindings argument is not a dict or list of dicts with the proper keys, or if the queues argument isn’t a dict with the proper keys.
- callback (callable) – A callable object that accepts one positional argument,
a
Signals¶
Signals sent by fedora_messaging APIs using blinker.base.Signal
signals.
pre_publish_signal¶
-
fedora_messaging.api.
pre_publish_signal
= <blinker.base.NamedSignal object at 0x7f69ed355b38; 'pre_publish'>¶ A signal triggered before the message is published. The signal handler should accept a single keyword argument,
message
, which is the instance of thefedora_messaging.message.Message
being sent. It is acceptable to mutate the message, but thevalidate
method will be called on it after this signal.
publish_signal¶
-
fedora_messaging.api.
publish_signal
= <blinker.base.NamedSignal object at 0x7f69ec9c95f8; 'publish_success'>¶ A signal triggered after a message is published successfully. The signal handler should accept a single keyword argument,
message
, which is the instance of thefedora_messaging.message.Message
that was sent.
publish_failed_signal¶
-
fedora_messaging.api.
publish_failed_signal
= <blinker.base.NamedSignal object at 0x7f69ec9c9630; 'publish_failed_signal'>¶ A signal triggered after a message fails to publish for some reason. The signal handler should accept two keyword argument,
message
, which is the instance of thefedora_messaging.message.Message
that failed to be sent, anderror
, the exception that was raised.
Message Schemas¶
This module defines the base class of message objects and keeps a registry of known message implementations. This registry is populated from Python entry points in the “fedora.messages” group.
To implement your own message schema, simply create a class that inherits the
Message
class, and add an entry point in your Python package under the
“fedora.messages” group. For example, an entry point for the Message
schema would be:
entry_points = {
'fedora.messages': [
'base.message=fedora_messaging.message:Message'
]
}
The entry point name must be unique to your application and is used to map
messages to your message class, so it’s best to prefix it with your application
name (e.g. bodhi.new_update_messageV1
). When publishing, the Fedora
Messaging library will add a header with the entry point name of the class used
so the consumer can locate the correct schema.
Since every client needs to have the message schema installed, you should define this class in a small Python package of its own.
Message¶
-
class
fedora_messaging.message.
Message
(body=None, headers=None, topic=None, properties=None, severity=None)[source]¶ Messages are simply JSON-encoded objects. This allows message authors to define a schema and implement Python methods to abstract the raw message from the user. This allows the schema to change and evolve without breaking the user-facing API.
There are a number of properties that are intended to be overridden by users. These fields are used to sort messages for notifications or are used to create human-readable versions of the messages. Properties that are intended for this purpose are noted in their attribute documentation below.
Parameters: - headers (dict) – A set of message headers. Consult the headers schema for expected keys and values.
- body (dict) – The message body. Consult the body schema for expected keys and values. This dictionary must be JSON-serializable by the default serializer.
- topic (six.text_type) – The message topic as a unicode string. If this is not provided, the default topic for the class is used. See the attribute documentation below for details.
- properties (pika.BasicProperties) – The AMQP properties. If this is not provided, they will be generated. Most users should not need to provide this, but it can be useful in testing scenarios.
- severity (int) – An integer that indicates the severity of the message. This is
used to determine what messages to notify end users about and should be
DEBUG
,INFO
,WARNING
, orERROR
. The default isINFO
, and can be set as a class attribute or on an instance-by-instance basis.
-
id
¶ The message id as a unicode string. This attribute is automatically generated and set by the library and users should only set it themselves in testing scenarios.
Type: six.text_type
-
topic
¶ The message topic as a unicode string. The topic is used by message consumers to filter what messages they receive. Topics should be a string of words separated by ‘.’ characters, with a length limit of 255 bytes. Because of this byte limit, it is best to avoid non-ASCII character. Topics should start general and get more specific each word. For example: “bodhi.update.kernel” is a possible topic. “bodhi” identifies the application, “update” identifies the message, and “kernel” identifies the package in the update. This can be set at a class level or on a instance level. Dynamic, specific topics that allow for fine-grain filtering are preferred.
Type: six.text_type
-
headers_schema
¶ A JSON schema to be used with
jsonschema.validate()
to validate the message headers. For most users, the default definition should suffice.Type: dict
-
body_schema
¶ A JSON schema to be used with
jsonschema.validate()
to validate the message body. The body_schema is retrieved on a message instance so it is not required to be a class attribute, although this is a convenient approach. Users are also free to write the JSON schema as a file and load the file from the filesystem or network if they prefer.Type: dict
-
body
¶ The message body as a Python dictionary. This is validated by the body schema before publishing and before consuming.
Type: dict
-
severity
¶ An integer that indicates the severity of the message. This is used to determine what messages to notify end users about and should be
DEBUG
,INFO
,WARNING
, orERROR
. The default isINFO
, and can be set as a class attribute or on an instance-by-instance basis.Type: int
-
queue
¶ The name of the queue this message arrived through. This attribute is set automatically by the library and users should never set it themselves.
Type: str
-
__str__
()[source]¶ A human-readable representation of this message.
This should provide a detailed, long-form representation of the message. The default implementation is to format the raw message id, topic, headers, and body.
Note
Sub-classes should override this method. It is used to create the body of email notifications and by other tools to display messages to humans.
-
agent_avatar
¶ An URL to the avatar of the user who caused the action.
Note
Sub-classes should override this method if the message was triggered by a particular user.
Returns: The URL to the user’s avatar. Return type: str or None
-
app_icon
¶ An URL to the icon of the application that generated the message.
Note
Sub-classes should override this method if their application has an icon and they wish that image to appear in applications that consume messages.
Returns: The URL to the app’s icon. Return type: str or None
-
containers
¶ List of containers affected by the action that generated this message.
Note
Sub-classes should override this method if the message pertains to one or more container images. The data returned from this property is used to filter notifications.
Returns: A list of affected container names. Return type: list(str)
-
flatpaks
¶ List of flatpaks affected by the action that generated this message.
Note
Sub-classes should override this method if the message pertains to one or more flatpaks. The data returned from this property is used to filter notifications.
Returns: A list of affected flatpaks names. Return type: list(str)
-
modules
¶ List of modules affected by the action that generated this message.
Note
Sub-classes should override this method if the message pertains to one or more modules. The data returned from this property is used to filter notifications.
Returns: A list of affected module names. Return type: list(str)
-
packages
¶ List of RPM packages affected by the action that generated this message.
Note
Sub-classes should override this method if the message pertains to one or more RPM packages. The data returned from this property is used to filter notifications.
Returns: A list of affected package names. Return type: list(str)
-
summary
¶ A short, human-readable representation of this message.
This should provide a short summary of the message, much like the subject line of an email.
Note
Sub-classes should override this method. It is used to create the subject of email notifications, IRC notification, and by other tools to display messages to humans in short form.
The default implementation is to simply return the message topic.
-
url
¶ An URL to the action that caused this message to be emitted.
Note
Sub-classes should override this method if there is a URL associated with message.
Returns: A relevant URL. Return type: str or None
-
usernames
¶ List of users affected by the action that generated this message.
Note
Sub-classes should override this method if the message pertains to a user or users. The data returned from this property is used to filter notifications.
Returns: A list of affected usernames. Return type: list(str)
-
validate
()[source]¶ Validate the headers and body with the message schema, if any.
In addition to the user-provided schema, all messages are checked against the base schema which requires certain message headers and the that body be a JSON object.
Warning
This method should not be overridden by sub-classes.
Raises: jsonschema.ValidationError
– If either the message headers or the message body are invalid.jsonschema.SchemaError
– If either the message header schema or the message body schema are invalid.
Message Severity¶
Each message can have a severity associated with it. The severity is used by applications like the notification service to determine what messages to send to users. The severity can be set at the class level, or on a message-by-message basis. The following are valid severity levels:
DEBUG¶
-
fedora_messaging.message.
DEBUG
= 10¶ Indicates the message is for debugging or is otherwise very low priority. Users will not be notified unless they’ve explicitly requested DEBUG level messages.
INFO¶
-
fedora_messaging.message.
INFO
= 20¶ Indicates the message is informational. End users will not receive notifications for these messages by default. For example, automated tests passed for their package.
Utilities¶
The schema_utils
module contains utilities that may be useful when writing
the Python API of your message schemas.
libravatar_url¶
-
fedora_messaging.schema_utils.
libravatar_url
(email=None, openid=None, size=64, default='retro')[source]¶ Get the URL to an avatar from libravatar.
Either the user’s email or openid must be provided.
If you want to use Libravatar federation (through DNS), you should install and use the
libravatar
library instead. Check out thelibravatar.libravatar_url()
function.Parameters: Returns: The URL to the avatar image.
Return type: Raises: ValueError
– If neither email nor openid are provided.
Exceptions¶
Exceptions raised by Fedora Messaging.
-
exception
fedora_messaging.exceptions.
BadDeclaration
(obj_type=None, description=None, reason=None)[source]¶ Raised when declaring an object in AMQP fails.
Parameters:
-
exception
fedora_messaging.exceptions.
BaseException
[source]¶ The base class for all exceptions raised by fedora_messaging.
-
exception
fedora_messaging.exceptions.
ConfigurationException
(message)[source]¶ Raised when there’s an invalid configuration setting
Parameters: message (str) – A detailed description of the configuration problem which is presented to the user.
-
exception
fedora_messaging.exceptions.
ConnectionException
(*args, **kwargs)[source]¶ Raised if a general connection error occurred.
You may handle this exception by logging it and resending or discarding the message.
-
exception
fedora_messaging.exceptions.
ConsumeException
[source]¶ Base class for exceptions related to consuming.
-
exception
fedora_messaging.exceptions.
ConsumerCanceled
[source]¶ Raised when the server has canceled the consumer.
This can happen when the queue the consumer is subscribed to is deleted, or when the node the queue is located on fails.
-
exception
fedora_messaging.exceptions.
Drop
[source]¶ Consumer callbacks should raise this to indicate they wish the message they are currently processing to be dropped.
-
exception
fedora_messaging.exceptions.
HaltConsumer
(exit_code=0, reason=None, requeue=False, **kwargs)[source]¶ Consumer callbacks should raise this exception if they wish the consumer to be shut down.
Parameters:
-
exception
fedora_messaging.exceptions.
Nack
[source]¶ Consumer callbacks should raise this to indicate they wish the message they are currently processing to be re-queued.
-
exception
fedora_messaging.exceptions.
NoFreeChannels
[source]¶ Raised when a connection has reached its channel limit
-
exception
fedora_messaging.exceptions.
PermissionException
(obj_type=None, description=None, reason=None)[source]¶ Generic permissions exception.
Parameters: - obj_type (str) – The type of object being accessed that caused the permission error. May be None if the cause is unknown.
- description (object) – The description of the object, if any. May be None.
- reason (str) – The reason the server gave for the permission error, if any. If no reason is supplied by the server, this should be the best guess for what caused the error.
-
exception
fedora_messaging.exceptions.
PublishException
(reason=None, **kwargs)[source]¶ Base class for exceptions related to publishing.
-
exception
fedora_messaging.exceptions.
PublishReturned
(reason=None, **kwargs)[source]¶ Raised when the broker rejects and returns the message to the publisher.
You may handle this exception by logging it and resending or discarding the message.
-
exception
fedora_messaging.exceptions.
ValidationError
[source]¶ This error is raised when a message fails validation with its JSON schema
This exception can be raised on an incoming or outgoing message. No need to catch this exception when publishing, it should warn you during development and testing that you’re trying to publish a message with a different format, and that you should either fix it or update the schema.
Configuration¶
conf¶
-
fedora_messaging.config.
conf
= {}¶ The configuration dictionary used by fedora-messaging and consumers.
DEFAULTS¶
-
fedora_messaging.config.
DEFAULTS
= {'amqp_url': 'amqp://?connection_attempts=3&retry_delay=5', 'bindings': [{'queue': '29ce8fa6-e303-4e70-a4fa-7a088be6447f', 'exchange': 'amq.topic', 'routing_keys': ['#']}], 'callback': None, 'client_properties': {'app': 'Unknown', 'information': 'https://fedora-messaging.readthedocs.io/en/stable/', 'product': 'Fedora Messaging with Pika', 'version': 'fedora_messaging-1.7.2 with pika-1.1.0'}, 'consumer_config': {}, 'exchanges': {'amq.topic': {'arguments': {}, 'auto_delete': False, 'durable': True, 'type': 'topic'}}, 'log_config': {'disable_existing_loggers': False, 'formatters': {'simple': {'format': '[%(name)s %(levelname)s] %(message)s'}}, 'handlers': {'console': {'class': 'logging.StreamHandler', 'formatter': 'simple', 'stream': 'ext://sys.stdout'}}, 'loggers': {'fedora_messaging': {'handlers': ['console'], 'level': 'INFO', 'propagate': False}}, 'root': {'handlers': ['console'], 'level': 'WARNING'}, 'version': 1}, 'passive_declares': False, 'publish_exchange': 'amq.topic', 'qos': {'prefetch_count': 10, 'prefetch_size': 0}, 'queues': {'29ce8fa6-e303-4e70-a4fa-7a088be6447f': {'arguments': {}, 'auto_delete': True, 'durable': False, 'exclusive': False}}, 'tls': {'ca_cert': None, 'certfile': None, 'keyfile': None}, 'topic_prefix': ''}¶ The default configuration settings for fedora-messaging. This should not be modified and should be copied with
copy.deepcopy()
.
Twisted¶
In addition to the synchronous API, a Twisted API is provided for applications that need an asynchronous API. This API requires Twisted 16.1.0 or greater.
Note
This API is deprecated, please use fedora_messaging.api.twisted_consume
Protocol¶
The core Twisted interface, a protocol represent a specific connection to the AMQP broker.
The FedoraMessagingProtocolV2
has replaced the deprecated
FedoraMessagingProtocolV2
. This class inherits the
pika.adapters.twisted_connection.TwistedProtocolConnection
class and
adds a few additional methods.
When combined with the fedora_messaging.twisted.factory.FedoraMessagingFactory
class, it’s easy to create AMQP consumers that last across connections.
For an overview of Twisted clients, see the Twisted client documentation.
-
class
fedora_messaging.twisted.protocol.
FedoraMessagingProtocol
(parameters, confirms=True)[source]¶ A Twisted Protocol for the Fedora Messaging system.
This protocol builds on the generic pika AMQP protocol to add calls specific to the Fedora Messaging implementation.
Warning
This class is deprecated, use the
FedoraMessagingProtocolV2
.Parameters: - parameters (pika.ConnectionParameters) – The connection parameters.
- confirms (bool) – If True, all outgoing messages will require a confirmation from the server, and the Deferred returned from the publish call will wait for that confirmation.
-
cancel
(queue)[source]¶ Cancel the consumer for a queue.
Parameters: queue (str) – The name of the queue the consumer is subscribed to. Returns: - A Deferred that fires when the consumer
- is canceled, or None if the consumer was already canceled. Wrap
the call in
defer.maybeDeferred()
to always receive a Deferred.
Return type: defer.Deferred
-
consume
(callback, queue)[source]¶ Register a message consumer that executes the provided callback when messages are received.
The queue must exist prior to calling this method. If a consumer already exists for the given queue, the callback is simply updated and any new messages for that consumer use the new callback.
If
resumeProducing()
has not been called when this method is called, it will be called for you.Parameters: - callback (callable) – The callback to invoke when a message is received.
- queue (str) – The name of the queue to consume from.
Returns: A namedtuple that identifies this consumer.
Return type: - NoFreeChannels: If there are no available channels on this connection.
- If this occurs, you can either reduce the number of consumers on this connection or create an additional connection.
-
pauseProducing
()[source]¶ Pause the reception of messages by canceling all existing consumers. This does not disconnect from the server.
Message reception can be resumed with
resumeProducing()
.Returns: fired when the production is paused. Return type: Deferred
-
resumeProducing
()[source]¶ Starts or resumes the retrieval of messages from the server queue.
This method starts receiving messages from the server, they will be passed to the consumer callback.
Note
This is called automatically when
consume()
is called, so users should not need to call this unlesspauseProducing()
has been called.Returns: fired when the production is ready to start Return type: defer.Deferred
-
class
fedora_messaging.twisted.protocol.
Consumer
(tag, queue, callback, channel)¶ A namedtuple that represents a AMQP consumer.
This is deprecated. Use
fedora_messaging.twisted.consumer.Consumer
.- The
tag
field is the consumer’s AMQP tag (str
). - The
queue
field is the name of the queue it’s consuming from (str
). - The
callback
field is the function called for each message (a callable). - The
channel
is the AMQP channel used for the consumer (pika.adapters.twisted_connection.TwistedChannel
).
-
callback
¶ Alias for field number 2
-
channel
¶ Alias for field number 3
-
queue
¶ Alias for field number 1
-
tag
¶ Alias for field number 0
- The
Factory¶
A Twisted Factory for creating and configuring instances of the
FedoraMessagingProtocol
.
A factory is used to implement automatic re-connections by producing protocol instances (connections) on demand. Twisted uses factories for its services APIs.
See the Twisted client documentation for more information.
-
class
fedora_messaging.twisted.factory.
FedoraMessagingFactory
(parameters, confirms=True, exchanges=None, queues=None, bindings=None)[source]¶ Reconnecting factory for the Fedora Messaging protocol.
-
buildProtocol
(addr)[source]¶ Create the Protocol instance.
See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.
-
cancel
(queue)[source]¶ Cancel the consumer for a queue.
This removes the consumer from the list of consumers to be configured for every connection.
Parameters: queue (str) – The name of the queue the consumer is subscribed to. Returns: - Either a Deferred that fires when the consumer
- is canceled, or None if the consumer was already canceled. Wrap
the call in
defer.maybeDeferred()
to always receive a Deferred.
Return type: defer.Deferred or None
-
clientConnectionFailed
(connector, reason)[source]¶ Called when the client has failed to connect to the broker.
See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.
-
clientConnectionLost
(connector, reason)[source]¶ Called when the connection to the broker has been lost.
See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.
-
consume
(callback, queue)[source]¶ Register a new consumer.
This consumer will be configured for every protocol this factory produces so it will be reconfigured on network failures. If a connection is already active, the consumer will be added to it.
Parameters: - callback (callable) – The callback to invoke when a message arrives.
- queue (str) – The name of the queue to consume from.
-
protocol
¶ alias of
fedora_messaging.twisted.protocol.FedoraMessagingProtocol
-
publish
(message, exchange=None)[source]¶ Publish a
fedora_messaging.message.Message
to an exchange on the message broker. This call will survive connection failures and try until it succeeds or is canceled.Parameters: - message (message.Message) – The message to publish.
- exchange (str) – The name of the AMQP exchange to publish to; defaults to publish_exchange
Returns: A deferred that fires when the message is published.
Return type: defer.Deferred
Raises: PublishReturned
– If the published message is rejected by the broker.ConnectionException
– If a connection error occurs while publishing. Calling this method again will wait for the next connection and publish when it is available.
-
startedConnecting
(connector)[source]¶ Called when the connection to the broker has started.
See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.
-
stopFactory
()[source]¶ Stop the factory.
See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.
-
-
class
fedora_messaging.twisted.factory.
FedoraMessagingFactoryV2
(parameters, confirms=True)[source]¶ Reconnecting factory for the Fedora Messaging protocol.
-
buildProtocol
(addr)[source]¶ Create the Protocol instance.
See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.
-
cancel
(consumers)[source]¶ Cancel a consumer that was previously started with consume.
Parameters: consumer (list of fedora_messaging.api.Consumer) – The consumers to cancel.
-
consume
(callback, bindings, queues)[source]¶ Start a consumer that lasts across individual connections.
Parameters: - callback (callable) – A callable object that accepts one positional argument,
a
Message
or a class object that implements the__call__
method. The class will be instantiated before use. - bindings (dict or list of dict) – Bindings to declare before consuming. This should be the same format as the bindings configuration.
- queues (dict) – The queues to declare and consume from. Each key in this dictionary is a queue, and each value is its settings as a dictionary. These settings dictionaries should have the “durable”, “auto_delete”, “exclusive”, and “arguments” keys. Refer to queues for details on their meanings.
Returns: A deferred that fires with the list of one or more
fedora_messaging.twisted.consumer.Consumer
objects. These can be passed to theFedoraMessagingFactoryV2.cancel()
API to halt them. Each consumer object has aresult
instance variable that is a Deferred that fires or errors when the consumer halts. The Deferred may error back with a BadDeclaration if the user does not have permissions to consume from the queue.Return type: defer.Deferred
- callback (callable) – A callable object that accepts one positional argument,
a
-
stopFactory
()[source]¶ Stop the factory.
See the documentation of twisted.internet.protocol.ReconnectingClientFactory for details.
-
when_connected
()[source]¶ Retrieve the currently-connected Protocol, or the next one to connect.
Returns: - A Deferred that fires with a connected
FedoraMessagingProtocolV2
instance. This is similar to the whenConnected method from the Twisted endpoints APIs, which is sadly isn’t available before 16.1.0, which isn’t available in EL7.
Return type: defer.Deferred
-
Service¶
Twisted Service to start and stop the Fedora Messaging Twisted Factory.
This Service makes it easier to build a Twisted application that embeds a
Fedora Messaging component. See the verify_missing
service in
fedmsg-migration-tools for a use case.
See https://twistedmatrix.com/documents/current/core/howto/application.html
-
class
fedora_messaging.twisted.service.
FedoraMessagingService
(amqp_url=None, exchanges=None, queues=None, bindings=None, consumers=None)[source]¶ A Twisted service to connect to the Fedora Messaging broker.
Parameters: - on_message (callable|None) – Callback that will be passed each incoming messages. If None, no message consuming is setup.
- amqp_url (str) – URL to use for the AMQP server.
- exchanges (list of dicts) – List of exchanges to declare at the start of
every connection. Each dictionary is passed to
pika.channel.Channel.exchange_declare()
as keyword arguments, so any parameter to that method is a valid key. - queues (list of dicts) – List of queues to declare at the start of every
connection. Each dictionary is passed to
pika.channel.Channel.queue_declare()
as keyword arguments, so any parameter to that method is a valid key. - bindings (list of dicts) – A list of bindings to be created between
queues and exchanges. Each dictionary is passed to
pika.channel.Channel.queue_bind()
. The “queue” and “exchange” keys are required. - consumers (dict) – A dictionary where each key is a queue name and the value is a callable object to handle messages on that queue. Consumers will be set up after each connection is established so they will survive networking issues.
-
factoryClass
¶ alias of
fedora_messaging.twisted.factory.FedoraMessagingFactory