Source code for fedora_messaging.message

"""
This module defines the base class of message objects and keeps a registry of
known message implementations. This registry is populated from Python entry
points in the "fedora.messages" group.

To implement your own message schema, simply create a class that inherits the
:class:`Message` class, and add an entry point in your Python package under the
"fedora.messages" group. For example, an entry point for the :class:`Message`
schema would be::

    entry_points = {
        'fedora.messages': [
            'base.message=fedora_messaging.message:Message'
        ]
    }

The entry point name must be unique to your application and is used to map
messages to your message class, so it's best to prefix it with your application
name (e.g. ``bodhi.new_update_messageV1``). When publishing, the Fedora
Messaging library will add a header with the entry point name of the class used
so the consumer can locate the correct schema.

Since every client needs to have the message schema installed, you should define
this class in a small Python package of its own.
"""

import datetime
import json
import logging
import uuid

import jsonschema
import pika
import pkg_resources

from . import config
from .exceptions import ValidationError
from .schema_utils import user_avatar_url


#: Indicates the message is for debugging or is otherwise very low priority. Users
#: will not be notified unless they've explicitly requested DEBUG level messages.
DEBUG = 10

#: Indicates the message is informational. End users will not receive notifications
#: for these messages by default. For example, automated tests passed for their
#: package.
INFO = 20

#: Indicates a problem or an otherwise important problem. Users are notified of
#: these messages when they pertain to packages they are associated with by default.
#: For example, one or more automated tests failed against their package.
WARNING = 30

#: Indicates a critically important message that users should act upon as soon as
#: possible. For example, their package no longer builds.
ERROR = 40

#: A tuple of all valid severity levels
SEVERITIES = (DEBUG, INFO, WARNING, ERROR)


_log = logging.getLogger(__name__)

# Maps string names of message types to classes and back
_schema_name_to_class = {}
_class_to_schema_name = {}

# Used to load the registry automatically on first use
_registry_loaded = False


def get_class(schema_name):
    """
    Retrieve the message class associated with the schema name.

    If no match is found, the default schema is returned and a warning is logged.

    Args:
        schema_name (str): The name of the :class:`Message` sub-class;
            this is typically the Python path.

    Returns:
        Message: A sub-class of :class:`Message` to create the message from.
    """
    global _registry_loaded
    if not _registry_loaded:
        load_message_classes()

    try:
        return _schema_name_to_class[schema_name]
    except KeyError:
        _log.warning(
            'The schema "%s" is not in the schema registry! Either install '
            "the package with its schema definition or define a schema. "
            "Falling back to the default schema...",
            schema_name,
        )
        return Message


def get_name(cls):
    """
    Retrieve the schema name associated with a message class.

    Returns:
        str: The schema name.

    Raises:
        TypeError: If the message class isn't registered. Check your entry point
            for correctness.
    """
    global _registry_loaded
    if not _registry_loaded:
        load_message_classes()

    try:
        return _class_to_schema_name[cls]
    except KeyError:
        raise TypeError(
            "The class {} is not in the message registry, which indicates it is"
            ' not in the current list of entry points for "fedora_messaging".'
            " Please check that the class has been added to your package's"
            " entry points.".format(repr(cls))
        )


def load_message_classes():
    """Load the 'fedora.messages' entry points and register the message classes."""
    for message in pkg_resources.iter_entry_points("fedora.messages"):
        cls = message.load()
        _log.info(
            "Registering the '%s' key as the '%r' class in the Message "
            "class registry",
            message.name,
            cls,
        )
        _schema_name_to_class[message.name] = cls
        _class_to_schema_name[cls] = message.name
    global _registry_loaded
    _registry_loaded = True


def get_message(routing_key, properties, body):
    """
    Construct a Message instance given the routing key, the properties and the
    body received from the AMQP broker.

    Args:
        routing_key (str): The AMQP routing key (will become the message topic)
        properties (pika.BasicProperties): the AMQP properties
        body (bytes): The encoded message body

    Raises:
        ValidationError: If Message validation failed or message body
            docoding/loading is impossible.
    """
    if properties.headers is None:
        _log.error(
            "Message (body=%r) arrived without headers. " "A publisher is misbehaving!",
            body,
        )
        properties.headers = {}

    try:
        MessageClass = get_class(properties.headers["fedora_messaging_schema"])
    except KeyError:
        _log.error(
            "Message (headers=%r, body=%r) arrived without a schema header."
            " A publisher is misbehaving!",
            properties.headers,
            body,
        )
        MessageClass = Message

    try:
        severity = properties.headers["fedora_messaging_severity"]
    except KeyError:
        _log.error(
            "Message (headers=%r, body=%r) arrived without a severity."
            " A publisher is misbehaving! Defaulting to INFO.",
            properties.headers,
            body,
        )
        severity = INFO

    if properties.content_encoding is None:
        _log.error("Message arrived without a content encoding")
        properties.content_encoding = "utf-8"
    try:
        body = body.decode(properties.content_encoding)
    except UnicodeDecodeError as e:
        _log.error(
            "Unable to decode message body %r with %s content encoding",
            body,
            properties.content_encoding,
        )
        raise ValidationError(e)

    try:
        body = json.loads(body)
    except ValueError as e:
        _log.error("Failed to load message body %r, %r", body, e)
        raise ValidationError(e)

    message = MessageClass(
        body=body, topic=routing_key, properties=properties, severity=severity
    )
    try:
        message.validate()
        _log.debug("Successfully validated message %r", message)
    except jsonschema.exceptions.ValidationError as e:
        _log.error("Message validation of %r failed: %r", message, e)
        raise ValidationError(e)

    if MessageClass.deprecated:
        _log.warning(
            "A message with a deprecated schema (%s.%s) has been received on topic %r. "
            "You should check the emitting application's documentation to upgrade to "
            "the newer schema version.",
            MessageClass.__module__,
            MessageClass.__name__,
            message.topic,
        )
    return message


[docs] class Message: """ Messages are simply JSON-encoded objects. This allows message authors to define a schema and implement Python methods to abstract the raw message from the user. This allows the schema to change and evolve without breaking the user-facing API. There are a number of properties that are intended to be overridden by users. These fields are used to sort messages for notifications or are used to create human-readable versions of the messages. Properties that are intended for this purpose are noted in their attribute documentation below. Args: headers (dict): A set of message headers. Consult the headers schema for expected keys and values. body (dict): The message body. Consult the body schema for expected keys and values. This dictionary must be JSON-serializable by the default serializer. topic (str): The message topic as a unicode string. If this is not provided, the default topic for the class is used. See the attribute documentation below for details. properties (pika.BasicProperties): The AMQP properties. If this is not provided, they will be generated. Most users should not need to provide this, but it can be useful in testing scenarios. severity (int): An integer that indicates the severity of the message. This is used to determine what messages to notify end users about and should be :data:`DEBUG`, :data:`INFO`, :data:`WARNING`, or :data:`ERROR`. The default is :data:`INFO`, and can be set as a class attribute or on an instance-by-instance basis. Attributes: id (str): The message id as a unicode string. This attribute is automatically generated and set by the library and users should only set it themselves in testing scenarios. topic (str): The message topic as a unicode string. The topic is used by message consumers to filter what messages they receive. Topics should be a string of words separated by '.' characters, with a length limit of 255 bytes. Because of this byte limit, it is best to avoid non-ASCII character. Topics should start general and get more specific each word. For example: "bodhi.update.kernel" is a possible topic. "bodhi" identifies the application, "update" identifies the message, and "kernel" identifies the package in the update. This can be set at a class level or on a instance level. Dynamic, specific topics that allow for fine-grain filtering are preferred. headers_schema (dict): A `JSON schema <http://json-schema.org/>`_ to be used with :func:`jsonschema.validate` to validate the message headers. For most users, the default definition should suffice. body_schema (dict): A `JSON schema <http://json-schema.org/>`_ to be used with :func:`jsonschema.validate` to validate the message body. The body_schema is retrieved on a message instance so it is not required to be a class attribute, although this is a convenient approach. Users are also free to write the JSON schema as a file and load the file from the filesystem or network if they prefer. body (dict): The message body as a Python dictionary. This is validated by the body schema before publishing and before consuming. severity (int): An integer that indicates the severity of the message. This is used to determine what messages to notify end users about and should be :data:`DEBUG`, :data:`INFO`, :data:`WARNING`, or :data:`ERROR`. The default is :data:`INFO`, and can be set as a class attribute or on an instance-by-instance basis. queue (str): The name of the queue this message arrived through. This attribute is set automatically by the library and users should never set it themselves. deprecated (bool): Whether this message schema has been deprecated by a more recent version. Emits a warning when a message of this class is received, to let consumers know that they should plan to upgrade. Defaults to ``False``. priority (int): The priority for the message, if the destination queue supports it. Defaults to zero (lowest priority). This value is taken into account in queues that have the ``x-max-priority`` argument set. Most queues in Fedora don't support priorities, in which case the value will be ignored. Larger numbers indicate higher priority, you can read more about it in `RabbitMQ's documentation on priority`_. .. _RabbitMQ's documentation on priority: https://www.rabbitmq.com/priority.html """ severity = INFO topic = "" headers_schema = { "$schema": "http://json-schema.org/draft-04/schema#", "description": "Schema for message headers", "type": "object", "properties": { "fedora_messaging_severity": { "type": "number", "enum": [DEBUG, INFO, WARNING, ERROR], }, "fedora_messaging_schema": {"type": "string"}, "sent-at": {"type": "string"}, }, } body_schema = { "$schema": "http://json-schema.org/draft-04/schema#", "description": "Schema for message body", "type": "object", } deprecated = False def __init__( self, body=None, headers=None, topic=None, properties=None, severity=None ): self.body = body or {} if topic: # Default is "" on base class self.topic = topic headers = headers or {} if severity: self.severity = severity self._properties = properties or self._build_properties(headers) self.queue = None def _build_properties(self, headers): # Consumers use this to determine what schema to use and if they're out # of date. headers = headers.copy() headers["fedora_messaging_schema"] = get_name(self.__class__) now = datetime.datetime.now(tz=datetime.timezone.utc).replace(microsecond=0) headers["sent-at"] = now.isoformat() headers["fedora_messaging_severity"] = self.severity # Mirror the priority in the headers for debugging purposes headers["priority"] = config.conf["publish_priority"] or 0 headers.update(self._filter_headers()) message_id = str(uuid.uuid4()) return pika.BasicProperties( content_type="application/json", content_encoding="utf-8", delivery_mode=2, headers=headers, message_id=message_id, priority=config.conf["publish_priority"], ) def _filter_headers(self): """ Add headers designed for filtering messages based on objects. Returns: dict: Filter-related headers to be combined with the existing headers """ headers = {} properties = [ ("user", "usernames"), ("group", "groups"), ("rpm", "packages"), ("container", "containers"), ("module", "modules"), ("flatpak", "flatpaks"), ] for header_name, prop_name in properties: try: items = getattr(self, prop_name) except Exception: # The message is probably invalid, don't add the header continue for item in items: headers[f"fedora_messaging_{header_name}_{item}"] = True return headers @property def _headers(self): """ The message headers dictionary. .. note: If there's a reason users want to use this interface, it can be made public. Please file a bug if you feel you need this. """ return self._properties.headers @_headers.setter def _headers(self, value): self._properties.headers = value @property def id(self): return self._properties.message_id @id.setter def id(self, value): self._properties.message_id = value @property def priority(self): return self._properties.priority or 0 @priority.setter def priority(self, value): value = value or 0 # convert None to 0 self._properties.priority = value # Mirror the priority in the headers for debugging purposes self._headers["priority"] = value @property def _encoded_routing_key(self): """The encoded routing key used to publish the message on the broker.""" topic = self.topic if config.conf["topic_prefix"]: topic = ".".join((config.conf["topic_prefix"].rstrip("."), topic)) return topic.encode("utf-8") @property def _encoded_body(self): """The encoded body used to publish the message.""" return json.dumps(self.body).encode("utf-8") def __repr__(self): """ Provide a printable representation of the object that can be passed to func:`eval`. """ return "{}(id={}, topic={}, body={})".format( self.__class__.__name__, repr(self.id), repr(self.topic), repr(self.body) ) def __eq__(self, other): """ Two messages of the same class with the same topic, headers, and body are equal. The "sent-at" header is excluded from the equality check as this is set automatically and is dependent on when the object is created. Args: other (object): The object to check for equality. Returns: bool: True if the messages are equal. """ if not isinstance(other, self.__class__): return False headers = self._headers.copy() other_headers = other._headers.copy() try: del headers["sent-at"] except KeyError: pass try: del other_headers["sent-at"] except KeyError: pass return ( self.topic == other.topic and self.body == other.body and headers == other_headers )
[docs] def validate(self): """ Validate the headers and body with the message schema, if any. In addition to the user-provided schema, all messages are checked against the base schema which requires certain message headers and the that body be a JSON object. .. warning:: This method should not be overridden by sub-classes. Raises: jsonschema.ValidationError: If either the message headers or the message body are invalid. jsonschema.SchemaError: If either the message header schema or the message body schema are invalid. """ for schema in (self.headers_schema, Message.headers_schema): _log.debug( 'Validating message headers "%r" with schema "%r"', self._headers, schema, ) jsonschema.validate(self._headers, schema) for schema in (self.body_schema, Message.body_schema): _log.debug( 'Validating message body "%r" with schema "%r"', self.body, schema ) jsonschema.validate(self.body, schema)
@property def summary(self): """ A short, human-readable representation of this message. This should provide a short summary of the message, much like the subject line of an email. .. note:: Sub-classes should override this method. It is used to create the subject of email notifications, IRC notification, and by other tools to display messages to humans in short form. The default implementation is to simply return the message topic. """ return self.topic
[docs] def __str__(self): """ A human-readable representation of this message. This should provide a detailed, long-form representation of the message. The default implementation is to format the raw message id, topic, headers, and body. .. note:: Sub-classes should override this method. It is used to create the body of email notifications and by other tools to display messages to humans. """ return "Id: {i}\nTopic: {t}\nHeaders: {h}\nBody: {b}".format( i=self.id, t=self.topic, h=json.dumps( self._headers, sort_keys=True, indent=4, separators=(",", ": ") ), b=json.dumps(self.body, sort_keys=True, indent=4, separators=(",", ": ")), )
@property def url(self): """ An URL to the action that caused this message to be emitted. .. note:: Sub-classes should override this method if there is a URL associated with message. Returns: str or None: A relevant URL. """ return None @property def app_name(self): """The name of the application that generated the message. .. note:: Sub-classes should override this method. Returns: str or None: The name of the application. """ return None @property def app_icon(self): """An URL to the icon of the application that generated the message. .. note:: Sub-classes should override this method if their application has an icon and they wish that image to appear in applications that consume messages. Returns: str or None: The URL to the app's icon. """ return None @property def agent_name(self): """The username of the user who caused the action. .. note:: Sub-classes should override this method if the message was triggered by a particular user. Returns: str or None: The agent's username. """ return None @property def agent_avatar(self): """An URL to the avatar of the user who caused the action. .. note:: Sub-classes should override this method if the default Libravatar and OpenID-based URL generator is not appropriate. Returns: str or None: The URL to the user's avatar. """ return user_avatar_url(self.agent_name) if self.agent_name is not None else None @property def usernames(self): """List of users affected by the action that generated this message. .. note:: Sub-classes should override this method if the message pertains to a user or users. The data returned from this property is used to filter notifications. Returns: list(str): A list of affected usernames. """ return [] @property def groups(self): """List of groups affected by the action that generated this message. .. note:: Sub-classes should override this method if the message pertains to a group or groups. The data returned from this property is used to filter notifications. Returns: list(str): A list of affected groups. """ return [] @property def packages(self): """List of RPM packages affected by the action that generated this message. .. note:: Sub-classes should override this method if the message pertains to one or more RPM packages. The data returned from this property is used to filter notifications. Returns: list(str): A list of affected package names. """ return [] @property def containers(self): """List of containers affected by the action that generated this message. .. note:: Sub-classes should override this method if the message pertains to one or more container images. The data returned from this property is used to filter notifications. Returns: list(str): A list of affected container names. """ return [] @property def modules(self): """List of modules affected by the action that generated this message. .. note:: Sub-classes should override this method if the message pertains to one or more modules. The data returned from this property is used to filter notifications. Returns: list(str): A list of affected module names. """ return [] @property def flatpaks(self): """List of flatpaks affected by the action that generated this message. .. note:: Sub-classes should override this method if the message pertains to one or more flatpaks. The data returned from this property is used to filter notifications. Returns: list(str): A list of affected flatpaks names. """ return []
#: The schema for each JSON object produced by :func:`dumps`, consumed by #: :func:`loads`, and expected by CLI commands like "fedora-messaging publish". SERIALIZED_MESSAGE_SCHEMA = { "$schema": "http://json-schema.org/draft-04/schema#", "description": "Schema for the JSON object used to represent messages in a file", "type": "object", "properties": { "topic": {"type": "string", "description": "The message topic"}, "headers": { "type": "object", "properties": Message.headers_schema["properties"], "description": "The message headers", }, "id": {"type": "string", "description": "The message's UUID."}, "body": {"type": "object", "description": "The message body."}, "queue": { "type": ["string", "null"], "description": "The queue the message arrived on, if any.", }, "priority": { "type": ["integer", "null"], "description": "The priority that the message has been sent with.", }, }, "required": ["topic", "body"], } def load_message(message_dict): """Load a message from a dictionary serialization. The dictionary must conform to the :data:`SERIALIZED_MESSAGE_SCHEMA` format. Args: message_dict (dict): The dictionary representing the message. Raises: ValidationError: If the dictionary does not pass the serialization schema. Returns: Message: The deserialized message object, as an instance of :class:`Message` or one of its subclasses. """ try: jsonschema.validate(message_dict, SERIALIZED_MESSAGE_SCHEMA) except jsonschema.exceptions.ValidationError as e: raise ValidationError(e) MessageClass = get_class( message_dict.get("headers", {}).get("fedora_messaging_schema", "base.message") ) message = MessageClass( body=message_dict["body"], topic=message_dict["topic"], headers=message_dict.get("headers"), severity=message_dict.get("headers", {}).get("fedora_messaging_severity"), ) # Restore the sent-at header try: message._headers["sent-at"] = message_dict["headers"]["sent-at"] except KeyError: pass if "id" in message_dict: message.id = message_dict["id"] message.queue = message_dict.get("queue") message.priority = message_dict.get("priority") return message
[docs] def dumps(messages): """ Serialize messages to a file format acceptable for :func:`loads` or for the publish CLI command. The format is a string where each line is a JSON object that conforms to the :data:`SERIALIZED_MESSAGE_SCHEMA` format. Args: messages (list or Message): The messages to serialize. Each message in the messages is subclass of Message. Returns: str: Serialized messages. Raises: ValidationError: If one of the messages provided doesn't conform to its schema. """ if isinstance(messages, Message): messages = [messages] serialized_messages = [] for message in messages: try: message.validate() except (jsonschema.exceptions.ValidationError, AttributeError) as e: raise ValidationError(e) m = { "topic": message.topic, "headers": message._headers, "id": message.id, "body": message.body, "queue": message.queue, "priority": message.priority, } serialized_messages.append(json.dumps(m, ensure_ascii=False, sort_keys=True)) return "\n".join(serialized_messages) + "\n"
[docs] def loads(serialized_messages): """ Deserialize messages from a file format produced by :func:`dumps`. The format is a string where each line is a JSON object that conforms to the :data:`SERIALIZED_MESSAGE_SCHEMA` format. Args: serialized_messages (str): A string made up of a JSON object per line. Returns: list: Deserialized message objects. Raises: ValidationError: If the string isn't formatted properly or message doesn't pass the message schema validation """ messages = [] for serialized_message in serialized_messages.splitlines(): try: message_dict = json.loads(serialized_message) except ValueError as e: raise ValidationError(e) message = load_message(message_dict) messages.append(message) return messages