Converting a fedmsg application

Converting publishers

Converting a Flask app

Let’s use the elections app as an example. Clone the code using the following command:

git clone https://pagure.io/elections.git

And change to this directory.

In the elections app, all calls to publish messages on fedmsg are going through the fedora_elections.fedmsgshim.publish wrapper function. We can thus modify this function to make it call Fedora Messaging instead of fedmsg.

JSON schema

First, you will need a Message schema. To write this schema you must know what kind of messages are sent on the bus. A git grep command will reveal that all calls are made from the admin.py file. Open that file and examine those calls.

In parallel, copy the docs/sample_schema_package/ directory from the fedora-messaging git clone to your app directory. Rename it to elections-messages. Edit the setup.py file like you did before, to change the package metadata (including the entry point). Use fedora_elections_messages for the name. Rename the mailman_messages directory to fedora_elections_messages and adapt the setup.py metadata.

Edit the messages.py file and write the basic structure for the elections message schema. According to the different calls in admin.py, it could be something like:

{
    'id': 'http://fedoraproject.org/message-schema/elections#',
    '$schema': 'http://json-schema.org/draft-04/schema#',
    'description': 'Schema for Fedora Elections',
    'type': 'object',
    'properties': {
        'agent': {'type': 'string'},
        'election': {'type': 'object'},
        'candidate': {'type': 'object'},
    },
    'required': ['agent', 'election'],
}

This could be sufficient, but it would be best to list what properties are available in the election and candidate keys. Unfortunately, those are just JSON dumps of the database model, so you’ll have to look further to know the structure.

Examining the to_json() methods in models.py shows which keys are dumped to JSON. The schema could be written as:

{
    'id': 'http://fedoraproject.org/message-schema/elections#',
    '$schema': 'http://json-schema.org/draft-04/schema#',
    'description': 'Schema for Fedora Elections',
    'type': 'object',
    'properties': {
        'agent': {'type': 'string'},
        'election': {
            'type': 'object',
            'properties': {
                'shortdesc': {'type': 'string'},
                'alias': {'type': 'string'},
                'description': {'type': 'string'},
                'url': {'type': 'string', 'format': 'uri'},
                'start_date': {'type': 'string'},
                'end_date': {'type': 'string'},
                'embargoed': {'type': 'number'},
                'voting_type': {'type': 'string'},
            },
            'required': [
                'shortdesc', 'alias', 'description', 'url',
                'start_date', 'end_date', 'embargoed', 'voting_type',
            ],
        },
        'candidate': {
            'type': 'object',
            'properties': {
                'name': {'type': 'string'},
                'url': {'type': 'string', 'format': 'uri'},
            },
            'required': ['name', 'url'],
        },
    },
    'required': ['agent', 'election'],
}

Use this schema and adapt the __str__() method and the summary property.

Since the schema is distributed in a separate python package, it must be added to the election app’s dependencies in requirements.txt.

Wrapper function

Now you can import this class in fedora_elections/fedmsgshim.py and use it to encapsulate the messages. The wrapper could look like:

import logging

from fedora_elections_messages.schema import Message
from fedora_messaging.api import publish as fm_publish
from fedora_messaging.exceptions import PublishReturned, PublishForbidden, ConnectionException

LOGGER = logging.getLogger(__name__)

def publish(topic, msg):
    try:
        fm_publish(Message(
            topic="fedora.elections." + topic,
            body=msg,
        ))
    except (PublishReturned, PublishForbidden) as e:
        LOGGER.warning(
            "Fedora Messaging broker rejected message %s: %s",
            msg.id, e
        )
    except ConnectionException as e:
        LOGGER.warning("Error sending the message %s: %s", msg.id, e)

With this you’ll get a couple of nice features over the previous state of things:

  • the message format is validated, so it’s your responsability to update the schema when you decide to change the format, and not the receiver’s responsability to handle any database schema changes you may make that may bleed into the message dictionary. And you’ll know during development if you break compatibility.
  • you may handle messaging errors in anyway you deem relevant. Here we’re just logging them but you could choose to re-send the messages, store them for further analysis, etc.
  • when there are no exceptions, you know that the message has reached the broker and has been distributed.

Testing

Let’s start the election app and make sure messages are properly sent on the bus. First, we’ll create a virtualenv, and install election and fedora-messaging with the following commands:

virtualenv venv
source ./venv/bin/activate
pushd elections-message-schemas
python setup.py develop
popd
pip install -r requirements.txt
python setup.py develop

Make sure the Fedora Messaging configuration file is correct in /etc/fedora-messaging/config.toml. We will add a queue binding to route messages with the fedora.elections topic to the tutorial queue. Add this entry in the bindings list:

[[bindings]]
queue = "tutorial"
exchange = "amq.topic"
routing_keys = ["fedora.elections.#"]

You could also add "fedora.elections.#" to the "routing_keys" value in the existing entry.

Now make sure that RabbitMQ is still running, and run the consume.py script we used before. Make sure it is not systematically raising exceptions in the callback function (as we did before).

Now we’ll run the election app, but first we need to create a configuration file. Create a file called config.py with the following content:

FEDORA_ELECTIONS_ADMIN_GROUP = ""

This will allow any Fedora account to be an admin on your instance, which is good enough for this tutorial. Now start the app with:

python createdb.py
python runserver.py -c config.py

Open your browser to http://localhost:5000/admin/new. Login with FAS, then create an election. Check the terminal where the consume.py script is running. You should see the message that the elections app has sent on election creation. Edit the election, and you should see the corresponding message in the terminal where consume.py is running.

Converting a Pyramid app

Let’s use the github2fedmsg app as an example. It is a Pyramid webapp that registers a webhook with Github on all subscribed projects, and then broadcasts actions (commits, pull-request, tickets) received on this webhook to the message bus.

Clone the code using the following command:

git clone git@github.com:fedora-infra/github2fedmsg.git

And change to this directory.

JSON Schema

The only call to fedmsg is in github2fedmsg/views/webhooks.py. Since the app transmits the webhook payload almost transparently to the message bus, the structure isn’t obvious, so it’s harder to define a schema. Fortunately, the Github documentation has a comprehensive list of payload formats.

It would be to long to define precise JSON schemas for each event type, so we’ll just use the generic schema.

Sending the messages

Now you can replace the current call to fedmsg with a call to fedora_messaging.api.publish. Add these lines in the github2fedmsg.views.webhook module:

import logging
from fedora_messaging.api import Message, publish
from fedora_messaging.exceptions import PublishReturned, PublishForbidden, ConnectionException

LOGGER = logging.getLogger(__name__)

And replace the call to fedmsg.publish with:

try:
    msg = Message(
        topic="github." + event_type,
        body=payload,
    )
    publish(msg)
except (PublishReturned, PublishForbidden) as e:
    LOGGER.warning(
        "Fedora Messaging broker rejected message %s: %s",
        msg.id, e
    )
except ConnectionException as e:
    LOGGER.warning("Error sending message %s: %s", msg.id, e)

Testing it

Make sure the Fedora Messaging configuration file is correct in /etc/fedora-messaging/config.toml. We will add a queue binding to route messages with the github topic to the tutorial queue. Add this entry in the bindings list:

[[bindings]]
queue = "tutorial"
exchange = "amq.topic"
routing_keys = ["github.#"]

You could also add "github.#" to the "routing_keys" value in the existing entry.

Now make sure that RabbitMQ is still running, and run the consume.py script we used before. Make sure it is not systematically raising exceptions in the callback function (as we did before).

To setup the github2fedmsg application, follow the README.rst file:

virtualenv venv
source ./venv/bin/activate
python setup.py develop
pip install waitress

Go off and register your development application with GitHub. Save the oauth tokens and add the secret one to a new file you create called secret.ini. Use the example secret.ini.example file.

Create the database and start the application:

initialize_github2fedmsg_db development.ini
pserve development.ini --reload

Converting consumers

Let’s use the-new-hotness app as an example. Clone the code and switch to state before conversion by using the following commands:

git clone https://github.com/fedora-infra/the-new-hotness.git
git checkout 0.10.1

And change to this directory.

In the-new-hotness app, all calls to consume messages on fedmsg are going through the hotness.consumers.BugzillaTicketFiler.consume method. We can thus modify this function to make it use Fedora Messaging instead of fedmsg.

Configuration

First we need to convert configuration file from fedmsg format to Fedora Messaging. Unlike fedmsg, fedora-messaging does not allow for arbitrary configuration keys.

The converted configuration config.toml could look like following:

# Define the callback function
# This will allow you to call only ``fedora-messaging consume`` without explicitly
# specifying the callback every time you starting ``the-new-hotness``.
callback = "hotness.consumers:BugzillaTicketFiler"

# In case of the-new-hotness we are listening to three topics, so we
# create a new binding for them
[[bindings]]
queue = "the-new-hotness"
exchange = "amq.topic"
routing_keys = [
   "org.release-monitoring.prod.anitya.project.version.update",
   "org.release-monitoring.prod.anitya.project.map.new",
   "org.fedoraproject.prod.buildsys.task.state.change",
]

# Define a queue
[queues.the-new-hotness]
durable = true
auto_delete = false
exclusive = false
arguments = {}

Any application specific configuration should go to consumer_config section Configuration.

Init method

The BugzillaTicketFiler class in consumers.py is doing all the consuming work. First we need to change the inheritance of this class.

Then we need to modify the __init__ method and use the fedora_messaging.config.conf dictionary instead of the fedmsg configuration. The __init__ method could look something like this after the change:

from fedora_messaging.config import conf

class BugzillaTicketFiler:
"""
A fedora-messaging consumer that is the heart of the-new-hotness.

This consumer subscribes to the following topics:

* 'org.fedoraproject.prod.buildsys.task.state.change'
  handled by :method:`BugzillaTicketFiler.handle_buildsys_scratch`

* 'org.release-monitoring.prod.anitya.project.version.update'
  handled by :method:`BugzillaTicketFiler.handle_anitya_version_update`

* 'org.release-monitoring.prod.anitya.project.map.new'
  handled by :method:`BugzillaTicketFiler.handle_anitya_map_new`
"""

def __init__(self):

    # This is just convenient.
    self.config = conf["consumer_config"]

    ...

Note

Unrelated code was deleted from the example.

Wrapper function

The next step is to change consume method to __call__ method. This is pretty straightforward. After this modification __call__ method should look like this:

def __call__(self, msg):
    """
    Called when a message is received from queue.

    Params:
        msg (fedora_messaging.message.Message) The message we received
            from the queue.
    """
    topic, body, msg_id = msg.topic, msg.body, msg.id
    _log.debug("Received %r" % msg_id)

    if topic.endswith("anitya.project.version.update"):
        self.handle_anitya_version_update(msg)
    elif topic.endswith("anitya.project.map.new"):
        self.handle_anitya_map_new(msg)
    elif topic.endswith("buildsys.task.state.change"):
        self.handle_buildsys_scratch(msg)
    else:
        _log.debug("Dropping %r %r" % (topic, body))
        pass

In this case we are working with the message using the standard fedora_messaging.message.Message methods. It is always better to use schema specific methods for any topic you are receiving.

Testing

To prepare the-new-hotness for testing checkout the requirements.txt file and devel folder from master branch:

git checkout master devel requirements.txt

This will convert development environment to the state when it’s ready for Fedora Messaging. In a configured development environment we can easily test our conversion.

Start app by using alias hotstart, this will start the systemd service with following command fedora-messaging consume. The systemd unit could be find in .config/systemd/user/.

For testing you can use any message from datagrepper. Just add /raw?category=<application name>&delta=259200 to URL and pick any message. For example category for Anitya is anitya.

To send the message you need simple publisher. One is created for the new hotness in devel/fedora_messaging_replay.py. To send the message you can use any message id found in datagrepper:

python3 devel/fedora_messaging_replay.py <msg_id>

And now you can check if the message was received using hotlog alias, which shows the journal log for the-new-hotness.