One of the challenges with modern application development is loosely-coupled communication between various independent components within an application; by and large it is not a technical problem, but one of having a common denominator and ensuring that parties to the communication don't entangle each other with dependencies. To this challenge in nRoute we have a messaging framework that decouples application-wide communication using a mediator, against which you can both publish and subscribe without creating direct dependencies. It works much like other mediators such as Prism's Event Aggregator and MVVM Light Toolkit's Messenger, however here the core concept is based around the observer pattern.
Rx's Observer Pattern
As just mentioned nRoute's messaging broker is based on the observer pattern, or more specifically on the Rx-framework's interpretation of the observer pattern vis-a-vie their IObserver<T> and IObservable<T> interfaces. Within our use context, you can think of an Observable as being a publisher and the Observer being a consumer. When the consumer wants to consume, it subscribes to the publisher (via the Subscribe method) and thereon the publisher can do three specific things - push a series of values (of an agreed type T, via the OnNext method), indicate that it is done (via the OnCompleted method) or if an error were to occur relay that (using the OnError method), which also stops the process. Further, when we subscribe we get an invoke-able token (IDisposable type) which the consumer can use to opt-out/unsubscribe from the publisher's output.

The other notable thing about the observer setup is that the communication is all one-way, this allows it to multiplex a single published message to more than one subscriber. Now in our broker implementation, we augment the observable with the capability to be able take in a payload and send it to all registered subscribers.
Observable Channels
nRoute's messaging framework implementation uses a "channel" metaphor to describe its observable broker (IObservable<T>) implementation; and each channel is uniquely defined by the type of is payload (ie. type T in IObservable<T>) it carries, and against each channel you can both publish a payload and subscribe for one or more payloads. Code-wise, these channels are nothing more than generic singleton classes that implement the IObservable<T> interface along with a publish mechanism, which as I eluded pushes the payload to all the subscribed IObserver<T> type consumers.

As seen above, each channel is represented by the Channel<T> class, which implements IObservable<T> and inherits from the Channel base class, plus the Manager property gives access to the only instance of the singleton Channel<T> class. Further, as class definition shows a Channel<T> class allows for three things, publish a message of type T, publish asynchronously a message of type T, or subscribe to the channel which yields an IDisposable. The Channel base class gives you convenient access to the same three functions, however in both generic and non-generic ways. Note, you do not have to create any channel object yourself and the payload type of T must be a reference type i.e. a class object.
Publishing to any channel couldn't be any simpler, below we look at six ways to publish a log message of type LogInfo both synchronously and asynchronously:
1: void PublishLog(LogInfo log)
2: {
3: // using the Channel<T> singleton class
4: Channel<LogInfo>.Manager.Publish(log);
5: Channel<LogInfo>.Manager.PublishAsync(log);
6:
7: // using the Channel class generic methods (note the generic type is inferred)
8: Channel.Publish(log);
9: Channel.PublishAsync(log);
10:
11: // using the Channel class's weakly-typed methods
12: Channel.Publish(typeof(LogInfo), log);
13: Channel.PublishAsync(typeof(LogInfo), log);
14: }
The code above should be self-explanatory, with the main take-away being that communication is done via channels, which themselves are defined by the type of the payload they carry. One other notable is that using an observable channel you cannot currently publish an exception (receivable via the OnError method), or indicate the channel is completed (receivable via the OnCompleted method) as a channel remains open through the application's lifespan.
Disposable Subscribers
Given the IObservable<T> channels the subscribers need to be of IObserver<T> type, and build-into nRoute for ease of use sake is an IObserver<T> implementing ChannelObserver<T> class.
The ChannelObserver<T> basically helps maintain subscription for a channel of type T, and accordingly it has an IsSubscribed property, along with Subscribe and Unsubscribe methods. Also the channel observer class implements IDisposable, so when you are through with it you can just call the Dispose method on it and it will unsubscribe from the channel if required. Additionally the ChannelObserver<T> class also surfaces some additional options like the subscription thread option, or if you would like to have a strongly-referenced subscription (by default all subscriptions are weakly-referenced) and also allows you to optionally provision a delegate-based filter. Below is a simple use snippet, note that you have to explicitly call the Subscribe method to start observing a channel.
1: // create the observer
2: var _observer = new ChannelObserver<LogInfo>((l) => ProcessLog(l));
3:
4: // subscribe on the UI Thread
5: _observer.Subscribe(ThreadOption.UIThread);
6:
7: // and unsubscribe
8: _observer.Unsubscribe();
I hope the code speaks for itself, as we basically create the channel observer by providing a lambda to handle any incoming log info, and subscribe and unsubscribe as need be. Note, you can subscribe and unsubscribe multiple times using the same observer, as long as you have not disposed the observer. Also, understand that the ChannelObserver<T> class is just a candy-implementation of IObservable<T> which for convenience-sake manages the IDisposable token internally, however you can alternatively furnish any custom implementation of IObserver<T>, so for example:
1: // subscribe to channel
2: var _unsubscribeToken = Channel<LogInfo>.Manager.Subscribe((l) => ProcessLog(l));
3:
4: // and to unsubscribe
5: _unsubscribeToken.Dispose();
Here we are using the Subscribe extension method in nRoute to subscribe by passing in lambda handler, which yields a disposable token that can be used to unsubscribe from the channel. Now if you wanted to subscribe with say threading options, you just need to pass in the observer explicitly as shown below
1: // create the observer
2: var _observer = new RelayObserver<LogInfo>((l) => ProcessLog(l), null, null);
3:
4: // subscribe
5: var _token = Channel<LogInfo>.Manager.Subscribe(_observer, ThreadOption.UIThread);
6:
7: // and to unsubscribe
8: _token.Dispose();
RelayObserver<T> is a generic implementation of IObserver<T> in nRoute that takes in three delegates for handling a payload, an exception and a notification of completion. And a complementary RelayObservable<T> implementation of IObservable<T> also exists in nRoute, which helps manage one or more IObserver<T> subscribers.
Dedicated Subscribers
In addition to the disposable subscribers, you can also create dedicated subscribers that automatically subscribe and work without any direct user interaction (kind of like services) - and this is sometimes useful when you want to create dedicated sinks for a channel. Lets look at a simple logging example:
1: [MapChannelObserver(typeof(LogInfo), "IsolatedLogger",
2: InitializationMode=InitializationMode.WhenAvaliable,
3: Lifetime=InstanceLifetime.Singleton)]
4: public class IsolatedLoggerSink : IObserver<LogInfo>
5: {
6:
7: #region IObserver<LogInfo> Members
8:
9: public void OnCompleted() { .. }
10:
11: public void OnError(Exception exception) { .. }
12:
13: public void OnNext(LogInfo value) { .. }
14:
15: #endregion
16:
17: }
So above, just like any subscriber to the LogInfo type channel the IsolatedLoggerSink class implements IObserver<LogInfo>, and additionally we just decorate it with the MapChannelObserver attribute. The mapping attribute is based on the Resource Locator Framework, and it automatically registers an instance of the class as a subscriber to the channel, and also helps manage the lifetime and initialization work - in the case above, we've set the initialization to be done as soon as the resource is available and the lifetime is set to be singleton.

Now normally once you have decorated the class with the MapChannelObserver attribute you don't have to deal with the subscription issue, however incases where you want manual control you can use the ChannelObserverLocator static class shown above - with it you can resolve any registered instance and explicitly consume it.
Threads, Async and Weak-Reference Issues
One of the features of nRoute's messaging framework is that it allows you to publish either on the publisher's working thread or asynchronously on a background thread, and on the subscribers end you can either consume on the publisher's thread, or on a background thread, or on the main UI thread (see the ThreadOption enum type). By default all subscribers use the publisher's thread, and in most cases this works fine but you have to careful (by manually specifying the TheadOption) in cases where the subscriber updates or effects UI controls as it can lead to cross-threading exceptions. Also when you are consuming or publishing asynchronously there is a non-trivial provisioning overhead associated with sending and receiving the payload asynchronously to each subscriber - so use it selectively, preferably with coarse-grained operations.
One of the other important notables is that by default internal references to all the subscribers are weakly-referenced, which ensures that if the subscriber falls out of scope/use its subscription is automatically removed on the next publishing cycle. Now, for most cases this has a negligible effect on performance, however in some cases for performance reasons or otherwise you can choose to have a non weakly-reference held, in which case you must explicitly unsubscribe else the subscriber's reference will be indefinitely held by the channel potentially creating memory leaks. In any case, the best practice is to always unsubscribe from a channel when done.
Future Enhancements
In the next drop of nRoute, I'm looking to put in place two main enhancements - first to allow explicit publishing of exceptions, so just as we can publish a payload in a channel we will be able to publish an exception through the same channel. This gels with our normal understanding than any operation can have two possible outcomes, one as defined by the operation's contract and the other being an exception passed through the call-stack. And so in the same vein, in our decoupled pub/sub type of communication we should be able to specifically "raise" (i.e. send) an error explicitly to all the channel's subscribers. The second improvement I seek is to allow "private channels" that can be uniquely indentified and consumed using a shared key, and unlike the public channels, private channels will be temporal so their owners can close them as required. Private channels will functionally be similar to public channels, except they will have to be addressed using their shared key.
Owing to the ongoing development of the Rx-framework, nRoute currently does not have a binary dependency on the Rx-framework, although we have their equivalent IObservable<T> and IObserver<T> definitions present. However, as the Rx-framework stabilizes future iterations of nRoute will take a direct dependency on it, which quite beneficially will bring into play all the operators and observer constructs from Rx.
Summary
So in this post I've covered the ins and out of the nRoute's messaging framework, with the basic outline being it can help decouple your inter-component communication by playing an intermediary role. Now, to press the point further in my next post I'll put an example how in a MVVM setup such mediated communications can play an important role to keep things sane.