Using MemBus

15 Sep 2010 in patterns | csharp | membus |

Usage of MemBus always goes through the IBus interface:

public interface IBus : IDisposable
{
    void Publish(object message);
    IDisposable Subscribe<M>(Action<M> subscription);
    IDisposable Subscribe<M>(Action<M> subscription, ISubscriptionShaper customization);
    IDisposable Subscribe<M>(Action<M> subscription, Action<ISubscriptionCustomizer<M>> customization);
    IObservable<M> Observe<M>();
}

Whenever you want to…well…publish a message, you use the Publish method. It will accept an instance of anything.

Subscribing is a more open thing. Note that the IBus interface helps in subscribing through a method. There is also the possibility of registering Handlers, i.e. instances whose purpose is to handle messages of a certain type. This will be the subject of another post.

The standard subscription takes a method, returning an IDisposable. Please note that MemBus by default follows the pattern that we will see in the future by IObservable implementations – When you dispose the returned object it is ensured that your subscription is removed from MemBus. I have planned in the future a weak-reference based alternative that will allow you to forget about subscriptions, but for now it is the responsibility of the subscriber to remove subscriptions:

var d = bus.Subscribe<MessageA>(msg => received++);
bus.Publish(new MessageA());
received.ShouldBeEqualTo(1);
d.Dispose();
bus.Publish(new MessageA());
received.ShouldBeEqualTo(1);

The other overloads are variations of subscribing. The overload with the subscription customizer will allow you to change the rules by which you get messages. So far it will allow you to do filtering and to specify that a call on your subscription happens on the dispatcher thread, which makes it useful for UI operations.

The lines

bus.Subscribe<MessageB>(subscription, c=>c.SetFilter(msg=>msg.Id == "A"));
bus.Subscribe<MessageB>(subscription, c => c.DispatchOnUiThread());

show you the syntax.

More about the ISubscriptionShaper overload when talking about the extension points of MemBus.

The Observe method opens to you the new and wonderful world of the reactive framework. It returns to you an Observable from which you obtain push-based notification of incoming instances of the desired type. Here’s a small example of using the filtering capabilities:

var messages = 
    from msg in bus.Observe<MessageA>() 
    where msg.Name == "A" 
    select msg;

using (messages.Subscribe(msg => msgCount++))
{
    bus.Publish(new MessageA {Name = "A"});
    bus.Publish(new MessageA {Name = "B"});
    bus.Publish(new MessageA {Name = "A"});
    msgCount.ShouldBeEqualTo(2);
}

Chronology

  |  
comments powered by Disqus