Following How Hydra works – Part 1, everyone should be a CouchDb expert, so we can move on to its use in creating a messaging system. The design aims for Hydra were to create something that is fully distributed, and very simple, with guaranteed message delivery. Distributed here means no single or even multiple points of failure: if any node in the Hydra network is running then the system should be usable. It also means (in my usage) that every node should be like every other node (no live/backup differentiation), and node failure and recovery should be transparent to users and administrators. I wanted a feeling of isotropy, where every part of the system is just like the whole. Very simple means that I didn’t want to have to track publishers or subscribers, and deal with the issues of subscribers disappearing, lease lifetimes etc. I was also keen if possible not to have any server component other than the CouchDb database, so that there is less to go wrong. Guaranteed message delivery means that every message gets to all its recipients eventually. In practice it should get there as soon as network connectivity allows.

The design process then went something like this (though with many detours):

  1. If every node has to be a fully-capable system on its own, then any client should be able to get any message from any node. That means every message must be delivered to every node. Full-mesh CouchDb replication gives us that.
  2. If the infrastructure does not keep track of publishers and subscribers then a pull-based mechanism is better than a push-based one. A push-based mechanism would require every message to be delivered to every client, as the system would not know whether a message is of interest to a particular client or not. In a pull-based mechanism clients can just request the messages they are interested in.
  3. But, as we saw in Part 1, we can’t just do arbitrary queries on a CouchDb database along the lines of “give me all the messages of some form”. We need to create CouchDb views that index the messages we’re interested in, and then query those views to extract the messages.
  4. If the system is to be at all efficient, views need to do better than “give me all the messages of some form”. They need to support the query “give me all new messages of some form”. That is – clients need to be able to poll the database for the messages they have not previously seen.
  5. That suggests that messages should contain a timestamp. We can then construct views that include the timestamp so that messages in the view become time-ordered. If the client wants messages since some point in time, they can then poll for items from the view with a higher timestamp. For example, if we want to find messages with a specific value of the topic field, then we construct a view that indexes on [topic, timestamp] and then clients can poll for elements between [myTopic, initialTimestamp] and [myTopic, aVeryLongWayIntoTheFuture].
  6. However, relying on the client to create the timestamp is tricky because anyone might write a client, and we then rely on them to include the timestamp and encode it in the right way as a JSON value (this is non-trivial as there is no general standard for encoding timestamps as JSON). It would be better if timestamping was taken out of the client’s hands, and done centrally. However, the “very simple” rule above means that can only be done by CouchDb. CouchDb cannot fiddle with documents as they are entered into the database, so it cannot create the timestamp as a message field. The only control it has is that CouchDb can be asked to generate the unique document id. Twitter's Snowflake algorithm provides a solution to this: make the document id a string consisting of a timestamp prefix plus a machine identifier suffix. The machine identifier ensures uniqueness if messages with the same timestamp are posted to different CouchDb instances.
  7. This leads to a diversion into the innards of CouchDb and its implementation language, Erlang. Without going into details it turns out that a) it is fairly easy to write the Erlang to generate document ids of the right form, b) the Erlang clock has the convenient property that it is guaranteed to generate monotonic increasing values each time Now() is called, and c) the CouchDb community are quite accommodating and willing to incorporate that Erlang into the CouchDb distribution. The end result is that the machine identifier can be set in the CouchDb ini file on installation, and then CouchDb can generate document ids for us.
  8. So we can generate document ids such that ordering by document id is the same as ordering by message posting time. When properly indexed this allows clients to poll the database for all messages since some point in time. This allows for features such as message reply, or catching up with missed messages if a client is offline for a while. Ideally it could be used for regular polling for new messages as well: clients just keep track of the last message seen, and poll for ones with a later timestamp. But this is not safe for a couple of reasons. First, messages may get delayed en route from one CouchDb database to another due to network problems, which may lead to message with "old" timestamps turning up at a database and being missed by clients that have already received messages with a later timestamp. Secondly, CouchDb's replicates documents in parallel, and does not guarantee that they arrive at a replica in the order they were created, so again messages can arrive with out-of-order timestamps, and might be missed in polling.
  9. There is an answer to this though: CouchDb database changes have a sequence number, and clients can poll for all changes since a specified sequence number. So the regular polling process for new documents can ask for changes since the last poll, and then determine which of those are of interest to the client.
  10. Now we have a working scheme. On startup, if a client wants to catch up with any missed messages, it requests all messages since the timestamp of the last message it saw. Thereafter it polls for changes, and processes any new messages. If the local CouchDb server fails, then the client switches to another one, asks for messages since the last timestamp it saw, and then goes into the normal polling procedure again.

And that’s the all the hard parts of the system design done. All that remains is to wrap in a .NET library the client-side polling and keeping track of the last document id per entrypoint. That takes us to the point where using messaging just involves:

  1. Creating a message class with whatever properties you want. This must inherit from a base class that takes care of the document id, serialisation, deserialisation, and posting. There is a standard HydraMessage class that saves even this step: it has standard Topic, Source, Destination etc fields, and a Data field into which you put your specific information.
  2. If you want to poll for messages based on non-standard criteria (currently other than Topic/Destination), you need to write a CouchDb view to index on those criteria. You also need to write a .NET MessageFetcher class to call the view, handing it the values of your criteria. There are predefined MessageFetchers for Topic/Destination criteria.
  3. Polling for messages by instantiating the predefined Poller class, and tell it which MessageFetcher to use. Poller is a generic class and deserialises incoming messages to the type of its class parameter. They are then raised as events on the Poller object.

Now you know everything there is know about Hydra.

Last edited Apr 28, 2012 at 12:46 PM by NickNorth, version 5


No comments yet.