The Ramblings Of A Coder by Omer Anson

Codewalk: Dragonflow's Publish-Subscribe Infrastructure - Part 2 - Usage

10 May 2016

In this three-parter I will go over and annotate the code for Dragonflow’s publish/subscribe mechanism. In this second part, I will go over and annotate two key locations where the API is used. Namely the NbApi class, and the publisher service. In Part 1 I introduced the API, and in Part 3 I will show driver implementation examples.

Introduction

In the previous part, I went over the API of Dragonflow’s publish/subscribe mechanism. In this part, I will go over two key locations where this API is used.

The NbApi, which will hopefully be covered in another post, is the abstraction layer used to access the database. For instance, database changes are done via this class, when Dragonflow is run as a Neutron service plugin.

The publisher service is an external process that is currently used to proxy the published events, in cases where the publishers cannot do so directly. This happens when the publisher need complete control over the port to which it binds, e.g. when using TCP.

Usage

NbApi

The code for NbApi is available here. I will not go over the entire class now. However, I will show some usage of the Pub/Sub API. Out of context, this may seem a bit unclear. I will (hopefully) go over this again in a post dedicated to NbApi.

As a first step, the NbApi needs to get an instance of the publisher and the subscriber. This is done in _get_publisher and _get_subscriber methods.

    def _get_publisher(self):
        if cfg.CONF.df.pub_sub_use_multiproc:
            pubsub_driver_name = cfg.CONF.df.pub_sub_multiproc_driver
        else:
            pubsub_driver_name = cfg.CONF.df.pub_sub_driver
        pub_sub_driver = df_utils.load_driver(
                                    pubsub_driver_name,
                                    df_utils.DF_PUBSUB_DRIVER_NAMESPACE)
        return pub_sub_driver.get_publisher()

In the _get_publisher case, the publisher may be a cross-node publisher (in case of a broker based pub/sub mechanism, e.g. Redis) or an inter-process publisher (e.g. ZMQ). The code checks the configuration to see which publisher is needed, and then loads and constructs (using df_utils.load_driver) the class of the correct manager. It then returns the manager’s publisher instance.

    def _get_subscriber(self):
        pub_sub_driver = df_utils.load_driver(
                                    cfg.CONF.df.pub_sub_driver,
                                    df_utils.DF_PUBSUB_DRIVER_NAMESPACE)
        return pub_sub_driver.get_subscriber()

The _get_subscriber case is simpler, since the cross-node subscriber is always used. Therefore, the code loads and constructs (again, using df_utils.load_driver) the cross-node pub/sub manager, and returns the subscriber instance the manager returns.

I would like to expand on the configuration options:

These values are mapped in the setup.cfg file, in the dragonflow.pubsub_driver section. This is the value of df_utils.DF_PUBSUB_DRIVER_NAMESPACE.

For example, the following is the configuration used for ZMQ

[df]
# ... More configuration
pub_sub_driver = zmq_pubsub_driver
pub_sub_multiproc_driver = zmq_pubsub_multiproc_driver

Once the publisher and subscriber are created, they need to be started. The publisher is started simply with a call to self.publisher.initialize(). The subscriber is started with a call to the method _start_subscriber.

    def _start_subsciber(self):
        self.subscriber.initialize(self.db_change_callback)
        # TODO(oanson) Move publishers_ips to DF DB.
        publishers_ips = cfg.CONF.df.publishers_ips
        for ip in publishers_ips:
            uri = '%s://%s:%s' % (
                cfg.CONF.df.publisher_transport,
                ip,
                cfg.CONF.df.publisher_port
            )
            self.subscriber.register_listen_address(uri)
        self.subscriber.daemonize()

The _start_subscriber starts by initialising the subscriber with a call to initialize, passing self.db_change_callback as the callback. Then the code reads all the publishers from configuration (In future also from the distributed database) and passes their URI to register_listen_address. Lastly, the code calls daemonize, which starts a new thread, and calls the subscriber’s run method within it, to continuously wait for events.

This is the db_change_callback method.

    def db_change_callback(self, table, key, action, value, topic=None):
        update = DbUpdate(table, key, action, value, topic=topic)
        LOG.info(_LI("Pushing Update to Queue: %s"), update)
        self._queue.put(update)
        eventlet.sleep(0)

The db_change_callback receives all the arguments of an event (i.e. table, key, action, value, and topic). It wraps these arguments in a DbUpdate instance, and adds it to a queue (the member _queue) for processing. This processing will be done later, by another thread.

Since we are using greenthreads (which mean cooperative threading), the call to eventlet.sleep is necessary to allow that other thread to work. The call to db_change_callback is done in the subscriber’s thread, and we want the actual event handling to be done in NbApi’s thread.

Once NbApi is ready to send an event via the pub/sub mechanism, it calls the method _send_db_change_event.

    def _send_db_change_event(self, table, key, action, value, topic):
        if self.use_pubsub:
            if not self.enable_selective_topo_dist:
                topic = SEND_ALL_TOPIC
            update = DbUpdate(table, key, action, value, topic=topic)
            self.publisher.send_event(update)
            eventlet.sleep(0)

This method receives all the parameters of an event (table, key, action, value, and topic). If the pub/sub mechanism is not used (denoted by the member use_pubsub), then the method returns and does nothing. Otherwise, the method continues.

If selective topology distribution feature is not used (denoted by the member enable_selective_topo_dist), then the code overrides the topic with the special topic SEND_ALL_TOPIC, which is the topic to which all subscribers listen.

Finally, the update object is created with a call to DbUpdate, and sent via the publisher by calling self.publisher.send_event(update). Since we are using greenthreads (which mean cooperative threading), the call to eventlet.sleep allows other threads to work, if necessary. Specifically, the send_event method may use a worker thread too.

Publisher Service

Another location that heavily uses the Pub/Sub API is the Dragonflow publisher service. This is the service that runs on the Controller Node, and binds the IP and port to allow subscribers to connect.

The code for this module can be found here.

def main():
    cfg.CONF.register_opts(common_params.df_opts, 'df')
    common_config.init(sys.argv[1:])
    config.setup_logging()
    service = PublisherService()
    service.initialize()
    service.run()

Since this is a service, it runs as a separate process. This process starts at the main function.

The main function registers the Dragonflow configuration options, and reads the given configuration files with a call to common_config.init. It also sets up the logging mechanism (by calling config.setup_logging).

Lastly, it creates an instance of PublisherService, initializes it, and calls its main function.

To allow the python script to be called without specifying that we want the main function in particular, the following code exists.

if __name__ == "__main__":
    main()

This code verifies that this python module is the main module, as opposed to being imported from another module, and then calls the main function.

The main body of the publisher service is implemented in the PublisherService class. It is a bit long, so again I separate it into chewable components.

class PublisherService(object):
    def __init__(self):
        self._queue = Queue()
        self.publisher = self._get_publisher()
        self.multiproc_subscriber = self._get_multiproc_subscriber()
        nb_driver_class = importutils.import_class(cfg.CONF.df.nb_db_class)
        self.db = nb_driver_class()
        self.uuid = pub_sub_api.generate_publisher_uuid()
        self._rate_limit = df_utils.RateLimiter(2, 30)

The PublisherService class doesn’t inherit from any other class.

The __init__ (constructor) method initialises the instance’s members. Specifically, these members are:

As an aside, the uuid is generated by generate_publisher_uuid, available here.

def generate_publisher_uuid():
    """
    Generate a non-random uuid based on the fully qualified domain name.
    This UUID is supposed to remain the same across service restarts.
    """
    return str(uuid.uuid5(uuid.NAMESPACE_DNS, socket.getfqdn()))

It returns a textual representation (call to str) of the uuid, which is generated by a SHA-1 hash (call to uuid.uuid5) on the DNS namespace (Retrieved with uuid.NAMESPACE_DNS) and the node’s fully qualified domain name (Retrieved using socket.getfqdn()).

The publisher is constructed with a call to _get_publisher.

    def _get_publisher(self):
        pub_sub_driver = df_utils.load_driver(
                                    cfg.CONF.df.pub_sub_driver,
                                    df_utils.DF_PUBSUB_DRIVER_NAMESPACE)
        return pub_sub_driver.get_publisher()

The selected publisher is always the cross-node publisher. Therefore, we retrieve an instnace of the cross-node pub/sub manager using df_utils.load_driver, and return the publisher it creates.

Similarly, the subscriber is constructed with a call to _get_multiproc_subscriber.

    def _get_multiproc_subscriber(self):
        """
        Return the subscriber for inter-process communication. If multi-proc
        communication is not use (i.e. disabled from config), return None.
        """
        if not cfg.CONF.df.pub_sub_use_multiproc:
            return None
        pub_sub_driver = df_utils.load_driver(
                                    cfg.CONF.df.pub_sub_multiproc_driver,
                                    df_utils.DF_PUBSUB_DRIVER_NAMESPACE)
        return pub_sub_driver.get_subscriber()

Firstly, this method verifies that inter-process publish/subscribe is used. We will see later why the publisher service may be used without it.

If pub_sub_use_multiproc is false, i.e. inter-process publish/subscribe is not used, the subscriber is None. Otherwise, the code retrieves an instance of the inter-process pub/sub manager, and returns its subscriber.

    def initialize(self):
        if self.multiproc_subscriber:
            self.multiproc_subscriber.initialize(self._append_event_to_queue)
        self.publisher.initialize()
        # TODO(oanson) TableMonitor initialisation goes here

The initialize method initialises the subscriber (if it exists), and the publisher.

In future, there are plans to add database table monitors to the publisher service. A table monitor will regularly sample a database table, and if it has changed, send an event. This is one of the reasons a publisher service may be required, even if inter-process publish/subscribe is not used.

The callback function given to the subscriber is _append_event_to_queue.

    def _append_event_to_queue(self, table, key, action, value, topic):
        event = db_common.DbUpdate(table, key, action, value, topic=topic)
        self._queue.put(event)
        eventlet.sleep(0)

Very similarly to NbApi’s implementation, this method also wraps the event arguments in a DbUpdate instance, and adds them to the queue. Recall that this method runs on the subscriber’s thread. It calls eventlet.sleep to allow the publisher service’s main thread to take over and process the event.

    def run(self):
        if self.multiproc_subscriber:
            self.multiproc_subscriber.daemonize()
        self.db.initialize(
            db_ip=cfg.CONF.df.remote_db_ip,
            db_port=cfg.CONF.df.remote_db_port,
            config=cfg.CONF.df
        )
        self._register_as_publisher()
        self._publishers_table_monitor = pub_sub_api.StalePublisherMonitor(
            self.db,
            self.publisher,
            cfg.CONF.df.publisher_timeout
        )
        self._publishers_table_monitor.daemonize()
        # TODO(oanson) TableMonitor daemonize will go here
        while True:
            try:
                event = self._queue.get()
                self.publisher.send_event(event)
                if event.table != pub_sub_api.PUBLISHER_TABLE:
                    self._update_timestamp_in_db()
                eventlet.sleep(0)
            except Exception as e:
                LOG.warning(_LW("Exception in main loop: {}, {}").format(
                    e, traceback.format_exc()
                ))
                # Ignore

The run method can be separated to two main parts - some more initialisation, and the main loop (It may have been better to move the initialisation code to initialize).

In the initialisation section, the code first creates the subscriber thread, and starts its main loop, by calling the subscriber’s daemonize method. This is done only if the subscriber exists (which only happens when multi-proc pub/sub is used).

It then initializes the member db, which connects to the database. The database parameters, such as IP and port, are read from the configuration.

The publisher service registers itself as a publisher via _register_as_publisher, which I will go over shortly.

It then creates a StalePublisherMonitor instance, which monitors the publishers’ table in the database, and removes publishers which have been idle for too long. This instance also runs in a separate thread, so the call to daemonize starts up that new thread, and calls the monitor’s main method in that thread.

Lastly, once other table monitors will be migrated to the publisher service, their initialisation and daemonisation can be done in place of the comment.

In the main loop section is surrounded by a while True loop. It waits for an event to appear in the queue by calling _queue.get(). This method blocks until an element appears in _queue. Once an event arrives, either from the subscriber thread or one of the table monitors, the publisher service forwards it to the subcribers via the publisher’s send_event method.

If the event did not originate from the publishers’ table, the publisher’s timestamp is also updated with a call to _update_timestamp_in_db. For this purpose, updates from the publishers’ table are ignored. Otherwise, this would cause a feedback loop, where a publishers’ table update would update the publisher’s timestamp in the publishers’ table.

Lastly, the other threads are given a chance to run by calling eventlet.sleep.

In case an exception occurs, it is ignored. The publisher service tries to continue working. It does emit a warning to the log, to facilitate debugging.

In the initialisation phase of run, there was a call to _register_as_publisher.

    def _register_as_publisher(self):
        publisher = {
            'id': self.uuid,
            'uri': self._get_uri(),
            'last_activity_timestamp': time.time(),
        }
        publisher_json = jsonutils.dumps(publisher)
        self.db.create_key(
            pub_sub_api.PUBLISHER_TABLE,
            self.uuid, publisher_json
        )

This method adds a publisher record to the publishers’ table in the database. The publisher record includes:

This information is serialised to a json string, and added to the database with the publisher’s id as key.

The publisher’s URI is retrieved with a call to _get_uri.

    def _get_uri(self):
        ip = cfg.CONF.df.publisher_bind_address
        if ip == '*' or ip == '127.0.0.1':
            ip = cfg.CONF.df.local_ip
        return "{}://{}:{}".format(
            cfg.CONF.df.publisher_transport,
            ip,
            cfg.CONF.df.publisher_port,
        )

The transport, IP, and port are taken from the configuration. The method then concatenates the parameters to a URI. e.g. if the transport is ‘tcp’, the port is 1234, and the IP is 192.168.17.105, then the result is ‘tcp://192.168.17.105:1234’.

The IP address in the configuration may be ‘*’, or ‘127.0.0.1’. This is to say that the publisher can be connected to using any of the node’s IP addresses. Therefore, the node’s IP address is read from configuration, and used instead.

Lastly, we saw that the publisher service updates its timestamp by calling _update_timestamp_in_db.

    def _update_timestamp_in_db(self):
        if self._rate_limit():
            return
        try:
            publisher_json = self.db.get_key(
                pub_sub_api.PUBLISHER_TABLE,
                self.uuid,
            )
            publisher = jsonutils.loads(publisher_json)
            publisher['last_activity_timestamp'] = time.time()
            publisher_json = jsonutils.dumps(publisher)
            self.db.set_key(
                pub_sub_api.PUBLISHER_TABLE,
                self.uuid,
                publisher_json
            )
        except exceptions.DBKeyNotFound:
            self._register_as_publisher()

Firstly, this method verifies that it does not flood the deployment with modifications in the publishers’ table, by calling the rate limiter via the _rate_limit member. This is important, since database modifications are constly in and of themselves, and they generate events that other nodes have to process.

The method then reads the existing publisher record from the database by its ID with a call to db.get_key. It parses the JSON string to a python dictionary using jsonutils.loads. It then updates the last_activity_timestamp field with the current time (retrieved with time.time()). It then serialises the dictionary back to a json string (jsonutils.dumps) and updates the database (with db.set_key).

In case the publisher record is not found in the database, a DBKeyNotFound exception is raised. In this case, the publisher creates the record anew in the database.

Conclusion

In this part, we have seen how the publish/subscribe mechanism is used.

In the next part, I will show two driver implementations of it.