IObservable based Messaging Broker for Silverlight

Posted by Rishi on 12-Feb-10 2:51 AM - Comments (10)

One of the challenges with modern application development is loosely-coupled communication between various independent components within an application; by and large it is not a technical problem, but one of having a common denominator and ensuring that parties to the communication don't entangle each other with dependencies. To this challenge in nRoute we have a messaging framework that decouples application-wide communication using a mediator, against which you can both publish and subscribe without creating direct dependencies. It works much like other mediators such as Prism's Event Aggregator and MVVM Light Toolkit's Messenger, however here the core concept is based around the observer pattern.

Rx's Observer Pattern

As just mentioned nRoute's messaging broker is based on the observer pattern, or more specifically on the Rx-framework's interpretation of the observer pattern vis-a-vie their IObserver<T> and IObservable<T> interfaces. Within our use context, you can think of an Observable as being a publisher and the Observer being a consumer. When the consumer wants to consume, it subscribes to the publisher (via the Subscribe method) and thereon the publisher can do three specific things - push a series of values (of an agreed type T, via the OnNext method), indicate that it is done (via the OnCompleted method) or if an error were to occur relay that (using the OnError method), which also stops the process. Further, when we subscribe we get an invoke-able token (IDisposable type) which the consumer can use to opt-out/unsubscribe from the publisher's output.

ObservingProcess

The other notable thing about the observer setup is that the communication is all one-way, this allows it to multiplex a single published message to more than one subscriber. Now in our broker implementation, we augment the observable with the capability to be able take in a payload and send it to all registered subscribers.

Observable Channels

nRoute's messaging framework implementation uses a "channel" metaphor to describe its observable broker (IObservable<T>) implementation; and each channel is uniquely defined by the type of is payload (ie. type T in IObservable<T>) it carries, and against each channel you can both publish a payload and subscribe for one or more payloads. Code-wise, these channels are nothing more than generic singleton classes that implement the IObservable<T> interface along with a publish mechanism, which as I eluded pushes the payload to all the subscribed IObserver<T> type consumers.

ObservableChannels

As seen above, each channel is represented by the Channel<T> class, which implements IObservable<T> and inherits from the Channel base class, plus the Manager property gives access to the only instance of the singleton Channel<T> class. Further, as class definition shows a Channel<T> class allows for three things, publish a message of type T, publish asynchronously a message of type T, or subscribe to the channel which yields an IDisposable. The Channel base class gives you convenient access to the same three functions, however in both generic and non-generic ways. Note, you do not have to create any channel object yourself and the payload type of T must be a reference type i.e. a class object.

Publishing to any channel couldn't be any simpler, below we look at six ways to publish a log message of type LogInfo both synchronously and asynchronously:

   1: void PublishLog(LogInfo log)
   2: {
   3:     // using the Channel<T> singleton class
   4:     Channel<LogInfo>.Manager.Publish(log);
   5:     Channel<LogInfo>.Manager.PublishAsync(log);
   6:  
   7:     // using the Channel class generic methods (note the generic type is inferred)
   8:     Channel.Publish(log);
   9:     Channel.PublishAsync(log);
  10:  
  11:     // using the Channel class's weakly-typed methods
  12:     Channel.Publish(typeof(LogInfo), log);
  13:     Channel.PublishAsync(typeof(LogInfo), log);
  14: }

The code above should be self-explanatory, with the main take-away being that communication is done via channels, which themselves are defined by the type of the payload they carry. One other notable is that using an observable channel you cannot currently publish an exception (receivable via the OnError method), or indicate the channel is completed (receivable via the OnCompleted method) as a channel remains open through the application's lifespan.

Disposable Subscribers

Given the IObservable<T> channels the subscribers need to be of IObserver<T> type, and build-into nRoute for ease of use sake is an IObserver<T> implementing ChannelObserver<T> class. ChannelObserversThe ChannelObserver<T> basically helps maintain subscription for a channel of type T, and accordingly it has an IsSubscribed property, along with Subscribe and Unsubscribe methods. Also the channel observer class implements IDisposable, so when you are through with it you can just call the Dispose method on it and it will unsubscribe from the channel if required. Additionally the ChannelObserver<T> class also surfaces some additional options like the subscription thread option, or if you would like to have a strongly-referenced subscription (by default all subscriptions are weakly-referenced) and also allows you to optionally provision a delegate-based filter. Below is a simple use snippet, note that you have to explicitly call the Subscribe method to start observing a channel.

   1: // create the observer
   2: var _observer = new ChannelObserver<LogInfo>((l) => ProcessLog(l));
   3:  
   4: // subscribe on the UI Thread
   5: _observer.Subscribe(ThreadOption.UIThread);
   6:  
   7: // and unsubscribe
   8: _observer.Unsubscribe();

I hope the code speaks for itself, as we basically create the channel observer by providing a lambda to handle any incoming log info, and subscribe and unsubscribe as need be. Note, you can subscribe and unsubscribe multiple times using the same observer, as long as you have not disposed the observer. Also, understand that the ChannelObserver<T> class is just a candy-implementation of IObservable<T> which for convenience-sake manages the IDisposable token internally, however you can alternatively furnish any custom implementation of IObserver<T>, so for example:

   1: // subscribe to channel
   2: var _unsubscribeToken = Channel<LogInfo>.Manager.Subscribe((l) => ProcessLog(l));
   3:  
   4: // and to unsubscribe
   5: _unsubscribeToken.Dispose();

Here we are using the Subscribe extension method in nRoute to subscribe by passing in lambda handler, which yields a disposable token that can be used to unsubscribe from the channel. Now if you wanted to subscribe with say threading options, you just need to pass in the observer explicitly as shown below

   1: // create the observer
   2: var _observer = new RelayObserver<LogInfo>((l) => ProcessLog(l), null, null);
   3:  
   4: // subscribe
   5: var _token = Channel<LogInfo>.Manager.Subscribe(_observer, ThreadOption.UIThread);
   6:  
   7: // and to unsubscribe
   8: _token.Dispose();

RelayObserver<T> is a generic implementation of IObserver<T> in nRoute that takes in three delegates for handling a payload, an exception and a notification of completion. And a complementary RelayObservable<T> implementation of IObservable<T> also exists in nRoute, which helps manage one or more IObserver<T> subscribers.

Dedicated Subscribers

In addition to the disposable subscribers, you can also create dedicated subscribers that automatically subscribe and work without any direct user interaction (kind of like services) - and this is sometimes useful when you want to create dedicated sinks for a channel. Lets look at a simple logging example:

   1: [MapChannelObserver(typeof(LogInfo), "IsolatedLogger",
   2:     InitializationMode=InitializationMode.WhenAvaliable,
   3:     Lifetime=InstanceLifetime.Singleton)]
   4: public class IsolatedLoggerSink : IObserver<LogInfo>
   5: {
   6:  
   7: #region IObserver<LogInfo> Members
   8:  
   9:     public void OnCompleted() { .. }
  10:  
  11:     public void OnError(Exception exception) { .. }
  12:  
  13:     public void OnNext(LogInfo value) { .. }
  14:  
  15: #endregion
  16:  
  17: }

So above, just like any subscriber to the LogInfo type channel the IsolatedLoggerSink class implements IObserver<LogInfo>, and additionally we just decorate it with the MapChannelObserver attribute. The mapping attribute is based on the Resource Locator Framework, and it automatically registers an instance of the class as a subscriber to the channel, and also helps manage the lifetime and initialization work - in the case above, we've set the initialization to be done as soon as the resource is available and the lifetime is set to be singleton.

MapChannelObserver

Now normally once you have decorated the class with the MapChannelObserver attribute you don't have to deal with the subscription issue, however incases where you want manual control you can use the ChannelObserverLocator static class shown above - with it you can resolve any registered instance and explicitly consume it.

Threads, Async and Weak-Reference Issues

One of the features of nRoute's messaging framework is that it allows you to publish either on the publisher's working thread or asynchronously on a background thread, and on the subscribers end you can either consume on the publisher's thread, or on a background thread, or on the main UI thread (see the ThreadOption enum type). By default all subscribers use the publisher's thread, and in most cases this works fine but you have to careful (by manually specifying the TheadOption) in cases where the subscriber updates or effects UI controls as it can lead to cross-threading exceptions.  Also when you are consuming or publishing asynchronously there is a non-trivial provisioning overhead associated with sending and receiving the payload asynchronously to each subscriber - so use it selectively, preferably with coarse-grained operations. 

ChannelsThreading

One of the other important notables is that by default internal references to all the subscribers are weakly-referenced, which ensures that if the subscriber falls out of scope/use its subscription is automatically removed on the next publishing cycle. Now, for most cases this has a negligible effect on performance, however in some cases for performance reasons or otherwise you can choose to have a non weakly-reference held, in which case you must explicitly unsubscribe else the subscriber's reference will be indefinitely held by the channel potentially creating memory leaks. In any case, the best practice is to always unsubscribe from a channel when done.

Future Enhancements

In the next drop of nRoute, I'm looking to put in place two main enhancements - first to allow explicit publishing of exceptions, so just as we can publish a payload in a channel we will be able to publish an exception through the same channel. This gels with our normal understanding than any operation can have two possible outcomes, one as defined by the operation's contract and the other being an exception passed through the call-stack. And so in the same vein, in our decoupled pub/sub type of communication we should be able to specifically "raise" (i.e. send) an error explicitly to all the channel's subscribers. The second improvement I seek is to allow "private channels" that can be uniquely indentified and consumed using a shared key, and unlike the public channels, private channels will be temporal so their owners can close them as required. Private channels will functionally be similar to public channels, except they will have to be addressed using their shared key.

Owing to the ongoing development of the Rx-framework, nRoute currently does not have a binary dependency on the Rx-framework, although we have their equivalent IObservable<T> and IObserver<T> definitions present. However, as the Rx-framework stabilizes future iterations of nRoute will take a direct dependency on it, which quite beneficially will bring into play all the operators and observer constructs from Rx.

Summary

So in this post I've covered the ins and out of the nRoute's messaging framework, with the basic outline being it can help decouple your inter-component communication by playing an intermediary role. Now, to press the point further in my next post I'll put an example how in a MVVM setup such mediated communications can play an important role to keep things sane.

Comments (10) -

Ryan Riley
Ryan Riley United States
on 07-Feb-10 3:30 PM
I know you are waiting for Rx to become more stable, but why not use something like the Subject<T> or AsyncSubject<T> in Rx for a Channel<T> and any IObserver<T> for a ChannelObserver<T>? Why have the channel observer maitain its own subscription?

Rishi
Rishi
on 08-Feb-10 1:16 AM
Hi Ryan, good questions. There are couple of reasons for not using/implementing ISubject<T>, first when I initially thought of it (and as it currently stands) I did NOT want to expose OnError or OnCompleted on a Channel, and even now I don't want to expose OnCompleted on a Channel as the lifetime of a Channel spans that of the Application itself. Secondly, all channels offer both synchronous and asynchronous "publishing" and I'm not sure how using an IObservable<T> implementation I could get that semantic across properly - an overload maybe, but thats outside the IObservable interface. And thirdly, this is the most important reason, within a Channel I do thread/context switching - so like a publisher can publish from a background thread and on the same Channel one subscriber can consume on the UI thread and another one can consume the payload on another thread. If I am correct, Rx even today does not support this level of per-subscriber/per-publisher synchronization. And lastly, by having a Channel maintain its own subscription I support neat things like weakly-referenced subscribers, along with support to automatically purge out-of-scope/non-alive subscribers.

About the ChannelObserver<T> class, it's what I called a "candy-implementation" of IObserver<T> because its just a simple wrapper, you can totally do without it. I just put it in because I thought it offered a somewhat easier to understand semantic of Subscribe/Unsubscribe rather than the nonspecific IDisposable handle.

Hope that makes sense. Cheers.

Alexey Zakharov
Alexey Zakharov Russia
on 17-Feb-10 4:24 AM
Hi Rishi,

Nice post. But I thing that you should try to deeply integrate it into Rx. Rx already provide shedulers that should solve thread options api you are providing. Predifinded subjects (ReplaySubject, AsyncSubject, Subject) are already channels.

Regards,
Alexey Zakharov

Rishi
Rishi
on 20-Feb-10 7:03 PM
@Alexey, since I wrote this post and replied to Ryan, I've had a bit of change of heart in that the new version of this IObservable broker will indeed be based on an implementation of ISubject<T>, however I've extended it to include some options for publishing/subscribing asynchronously. Also included in the new version is an implementation for private channels, which would again be ISubject<T> based.

Now, true I just checked Rx now includes an option to subscribe on different dispatchers, which is quite nice - but it wasn't there when I started. And even then, I'm still not convinced to take a direct dependency on the Rx-framework, because then I'll need to march on their schedule or otherwise binary compatibility between releases will always be an issue. So instead, I'm asking the SL forks to include both the IObserver<T> and IObservable<T> interfaces in Silverlight 4, just like in .NET 4 BCL. You can vote for this suggestion here:

dotnet.uservoice.com/.../523437-include-iobserver-t-and-iobservable-t-interface

If this happens, then we can have binary compatibility between nRoute and anything Rx-framework guys release, and that would be a great because it would mean we can use most of the operators in Rx, such as SubscribeOnDispatcher() without nRoute having to double in size. Lets see how it goes, but I definitely want full alignment with the Rx-framework.

Vish
Vish United States
on 14-Apr-10 11:05 AM

Hi Rishi,

Nice work. I was hoping you could shed some light on a couple of things here

- So,I can't create two channels which have the same payload? Think private communication between controls with the same payload

- Why singletons? Why not use an IoC to manage the lifetime/instance as a singleton? Or is that what happens under covers?

Also, i think it would be really helpful if you could publish these numerous little utilities hidden in the nRoute framework as independent code snippets or library. I am constrained to use the PRISM framework because it has the MS seal on it. And also, nRoute always seems to be in a state of constant development... And i don't want to bloat up my silverlight app with other things in the framework that i don't use.

Great work on the framework by the way... love it.

Thank You,
Vish

Buy facebook fans
Buy facebook fans United States
on 04-Dec-11 4:20 PM
Buy facebook likes from www.fanbullet.com will be an ultimate source for the successful online marketing. I decided to buy facebook fans and likes for targeted user. www.fanbullet.com has provided a new marketing strategy for my business promotion. Really, it is quite appreciative source to promote business online.
http://www.fanbullet.com/facebook-fans

Daniel
Daniel United States
on 05-Jan-12 2:38 AM
Thanks a lot for sharing. You have done a brilliant job.

Djonny
Djonny United States
on 05-Jan-12 2:38 AM
Cool thing! thanks for sharing. I have never heard it  <a href="blog.dissertationrelief.com/.../dissertation-proposal-help-2">dissertation proposal</a>

alex
alex United States
on 08-Jan-12 9:42 PM
When see your work. I was shocking.
www.qualitycartridge.co.uk/...oner_cartridges.html

buy facebook fans
buy facebook fans India
on 11-Jan-12 3:18 PM
Thank you for such a fantastic blog. Where else could anyone get that kind of info written in such a perfect way? I have a presentation that I am presently working on, and I have been on the look out for such information.

buy facebook fans
buy facebook fans India
on 11-Jan-12 5:38 PM
Great. I really like what you've acquired here, really like what you are stating and the way in which you say it. You make it enjoyable and you still take care of to keep it sensible. I can't wait to read much more from you. This is really a tremendous site.

professioanl essay writers
professioanl essay writers United States
on 19-Jan-12 11:13 PM
One of the challenges with modern application development is loosely-coupled communication between various independent components within an application.

webdesign uk
webdesign uk United States
on 02-Feb-12 1:33 AM
Nice work. I like that`t

Pingbacks and trackbacks (5)+

Add comment

  Country flag

biuquote
  • Comment
  • Preview
Loading