Conversations

Messaging systems should support different kinds of communication between apps. An important requirement is to allow two applications to hold a conversation, where a conversation is a sequence of messages between two specified parties, tied together in some way way so that they can all be identified as part of the same conversation. For example, the OPS Job Processor uses conversations: a client sends an initial request to perform some job, with parameters, the Job Processor notifies the client that it is working on the job, sends periodic progress updates, and finally sends results back. The messages are identified as part of the same conversation by a Handle field, which is just a GUID.

There are two elements to the Hydra conversation convention: the Switchboard and the Conversation. Any application taking part in conversations instantiates a Switchboard object per conversation type to manage them: this listens for new incoming calls, sets up new outgoing ones, and ensures that messages are routed to the right Conversation objects. Each conversation is handled by a Conversation object: it receives incoming messages, raising them to its owner, and sends outgoing ones, tagging them with the right routing information. The information passed back and forth during a conversation consists of serialised objects of some chosen type – so the Switchboard and Conversation classes are generic, being parameterised by the chosen type.

Typically a conversation participant will have a class of either client or server objects that are responsible for handling a conversation: they will listen out for messages, process them appropriately, and send replies as necessary. These objects will have a Conversation object embedded in them: this will raise incoming messages and be used to send outgoing ones. If the application initiates conversations, it will use the Switchboard to create a new Conversation object and then pass it to a newly created client object to handle the conversation; if the application responds to conversation requests, then its Switchboard will hand out a new Conversation object when a request comes in, the application will create a server object and then pass it the Conversation object and let the server handle the conversation from there.

Some code should make this more concrete. Say we have a simple conversation whose messages are objects of the ConversationDto type below:

public class ConversationDto
{ public MessageTypes MessageType { get; set; } public string Data { get; set; } } public enum MessageTypes
{ Init, Ack, Request, Response, End }

The idea here is that the client sends an initial Init message with any initial data needed to set up the conversation; the server responds with an Ack; the client then sends a series of requests, containing data; the server responds to each one with a Response, containing the reply data; and then the client finally sends an End message.

For the purposes of illustration let's say that the client side of the application is a Winforms app that creates a new client when the user clicks a button. Then the main form code will look like this:

public partial class Form1 : Form
{ private readonly Switchboard<ConversationDto> _switchboard; public Form1() { InitializeComponent(); hydraService = new HydraService(...); _switchboard = new Switchboard<ConversationDto>(hydraService, "AppendClient"); } private void NewBtn_Click(object sender, EventArgs e) { var client = _switchboard.NewConversation("AppendServer"); var clientUi = new AppendClientUi(); clientUi.Init(client, SuffixBox.Text); ClientPanel.Controls.Add(clientUi); } }

Form initialisation creates a Switchboard<ConversationDto> object, handing it a HydraService object (see Publish/subscribe by type for more on this), and telling it that the name of this application is AppendClient. This will cause the Switchboard to listen out for ConversationDto messages sent to AppendClient. When the user clicks the button to start a new conversation, the application uses the Switchboard to create a new Conversation object, giving it the remote party for the conversation as an argument: AppendServer in this case. This is handed to a newly created AppendClientUi object, which is added to the form for the user to monitor the conversation.

The AppendClientUi is a UserControl (MVVM fans look away now: this is just an example, and not recommended coding style). This has some labels to show the state of the conversation and a couple of buttons to allow the user to send new requests or end the conversation. The code behind looks like this:

public partial class AppendClientUi : UserControl
{
    private Conversation<ConversationDto> _conversation;
    private IDisposable _subscription;

    public AppendClientUi()
    {
        InitializeComponent();
    }

    public void Init(Conversation<ConversationDto> conversation, string suffix)
    {
        HandleLbl.Text += " " + conversation.Handle;
        SuffixLbl.Text += " " + suffix;

        _conversation = conversation;
        _subscription = _conversation.ObserveOn(SynchronizationContext.Current).Subscribe(OnNext);
        _conversation.Send(new ConversationDto { MessageType = MessageTypes.Init, Data = suffix });
    }

    private void OnNext(ConversationDto message)
    {
        ResponseLbl.Text = string.Format("Last response: {0}, {1}", message.MessageType, message.Data);
    }

    private void RequestBtn_Click(object sender, EventArgs e)
    {
        _conversation.Send(new ConversationDto { MessageType = MessageTypes.Request, Data = RequestBox.Text });
    }

    private void EndBtn_Click(object sender, EventArgs e)
    {
        _conversation.Send(new ConversationDto { MessageType = MessageTypes.End });
        _subscription.Dispose();
        _conversation.Dispose();
    }
}

In the Init method, the object populates the labels with some conversation properties, saves the conversation to a class variable, subscribes to the conversation’s messages, and then uses the conversation object to send an Ack message to the server. The subscription needs a little explanation as it uses the Microsoft Reactive Extensions: the ObserveOn method marshals everything onto the UI thread to save having to use Invoke later; the Subscribe method then tells it call OnNext for each message that comes in; and the _subscription result is an IDisposable that we dispose of when we want to stop listening for new messages.

The OnNext method will now be called for every incoming message in this conversation, and we just show the message type and data in the UI (without needing to use Invoke, as noted above). The button click methods send self-explanatory messages using the Conversation object, with the EndBtn method tidying up the subscription and conversation. (When the conversation is disposed, it notifies the switchboard that it is shutting down, all further incoming messages in its conversation are quietly dropped, and attempts to send further messages are ignored.)

Now for the server side of the application. This is also a Winforms app, but has no UI other than a blank form. The code behind the form is very simple:

public partial class Form1 : Form
{
    private readonly HashSet<AppendServer> _servers = new HashSet<AppendServer>();

    public Form1()
    {
        InitializeComponent();
        hydraService = new HydraService(...);
        new Switchboard<ConversationDto>(hydraService, "AppendServer").Subscribe(OnNext);
    }

    private void OnNext(Conversation<ConversationDto> conversation)
    {
        _servers.Add(new AppendServer(conversation));
    }
}

The initialisation code creates a Switchboard that listens for messages to AppendServer. The Subscribe call ensures that OnNext will be called whenever a new conversation is started up, being passed the Conversation object. (Note that the Switchboard instance on the application side also notifies of any new conversations that are started up, but we chose to ignore them, as we're not expecting any incoming conversations to the AppendClient app.) OnNext just creates a new AppendServer object, passes it the conversation, and adds it to a collection in the form to keep it alive.

The AppendServer class does all the hard work. As you might guess from the name, it appends a string to the Data of incoming Request messages, and sends the result in a Response. The string appended is the Data value in the first Init message. The code is as follows:

class AppendServer
{
    private readonly Conversation<ConversationDto> _conversation;
    private readonly IDisposable _subscription;
    private string _suffix;

    public AppendServer(Conversation<ConversationDto> conversation)
    {
        _conversation = conversation;
        _subscription = conversation.Subscribe(OnNext);
    }

    private void OnNext(ConversationDto message)
    {
        // Ignore invalid messages
        switch (message.MessageType) {
                case MessageTypes.Init:
                _suffix = message.Data;
                _conversation.Send(new ConversationDto { MessageType = MessageTypes.Ack });
                break;
            case MessageTypes.Request:
                _conversation.Send(new ConversationDto { MessageType = MessageTypes.Response, Data = message.Data + _suffix });
                break;
            case MessageTypes.End:
                _subscription.Dispose();
                _conversation.Dispose();
                break;
        }
    }
}

I'm sure this needs almost no explanation by now. As before the object subscribes to conversation messages, but there's no need for ObserveOn here as we don't intend to update the UI. OnNext just process each message as it comes in, sending a reply if appropriate.

So that's it: conversations made (relatively) simple. the idea is that your code can be devoted almost entirely to just processing messages in a single conversation without having to worry about which messages belong to which conversation: every conversation is encapsulated in an object that ensures you only get the messages meant for you, and can only send messages to the right recipient. I did have an earlier implementation which required even less management of objects, but you had to inherit from a ConversationBase class, and that seemed too intrusive: libraries should not force inheritance because the library user may have their own inheritance hierarchy that then has to be distorted to fit the library's view of the world.

There are a few other bits and pieces I haven't gone into here. For example, the Switchboard creation call allows customisation to enable discrimination between different kinds of conversation that use the same message type, or message serialisation using something other than the default DataContractSerializer (serialising into JSON is handy if you want to interoperate with non-.NET clients, for example, and Hydra has a standard serialiser for that purpose). One point to note is that all messages are processed on the same background thread. That means that message processing should be quick; if it will take a long time to act upon a message, then it should be done asynchronously so that code can return quickly from its OnNext calls.

Last edited May 9, 2012 at 4:51 PM by NickNorth, version 5

Comments

No comments yet.