Introduction to the Reactive Framework Part II

In my previous post, I talked a little bit about the parts of the Reactive Framework that are coming to the .NET 4 BCL as well as their implementation in F# as part of F# First Class Events.  This time, let’s come back to the Reactive Framework itself from which the IObservable<T> and IObserver<T> originated.  As you may remember, you can play with the bits of the Reactive Framework from the Silverlight 3 Toolkit

In the first post in the series, I gave a basic introduction to the basic problems we have with asynchronous and reactive programming.  We covered some of the evolution of the .NET framework with asynchronous programming and where the Reactive Framework fits.  This time, I’ll pick up where we left off to talk about one of the important ideas behind the Reactive Framework, the duality of Enumerable and Observable.

The Duality of Enumerable  & Observable

During Erik’s appearances on Channel 9, an important, yet recurring theme was the mathematical duality of the Enumerable to the Observable.  What does that mean exactly though? 

We remember from our first post in the series where we talked about the pull (interactive) versus the push (reactive) model.  The pull model, represented by the iterator pattern of IEnumerable<T>/IEnumerator<T> states that we must explicitly call a method in order to get each item from our abstracted collection.  On the other hand, our push model, represented by the observable pattern of IObservable<T>/IObserver<T> states that we register an interest through a subscription and then items are subsequently handed to us from some abstracted collection.

First, let’s look at the iterator pattern, which is a common interactive programming pattern that’s used quite frequently inside .NET.  This consists of two interfaces, the IEnumerable<T> and the IEnumerator<T>.  Listed below are some of the highlights of these two interfaces.  I cut down what they are to their essential bits.

public interface IEnumerable<T>
{
IEnumerator
<T> GetEnumerator();
}

public interface IEnumerator<T> : IDisposable
{
T Current {
get; }
bool MoveNext();
}

The IEnumerable<T> interface exposes a single method which returns an IEnumerator<T> to iterate some object.  The IEnumerator<T> gives us the ability to get the current item and determine whether there are more items to iterate. 

To think about basic asynchronous programming, we must consider a basic Gang of Four Pattern, the Observer Pattern.  This pattern describes where an object called an Observable, maintains a list of its dependent Observer classes and notifies them automatically of any state changes.  In Java, this should be rather familiar territory with the java.util.Observer class and the java.util.Observable interfaces.  In the Reactive Framework, this approach is no different, as we have an IObservable<T> interface which maintains a list of dependent IObserver<T> interfaces and notifies them automatically of any state changes.  Let’s look at the signatures below:

public interface IObserver<T>
{
void OnCompleted();
void OnError(Exception exception);
void OnNext(T value);
}

public interface IObservable<T>
{
IDisposable Subscribe(IObserver
<T> observer);
}

What you may notice, and what you may have seen on some of Erik Meijer’s Channel 9 videos is that there is a duality between the Enumerable and Observable, given the signature of these interfaces.  To explain what I mean, let’s first look at the flow of the Enumerator:

image

The GetEnumerator function takes no arguments and returns the IEnumerator<T> which has a no-argument function and returns the next item in the list through MoveNext and Current.  Now, let’s look at the inverse of it. 

image

The duality of the Enumerable solution is quite simple.  We have our observer on the left hand side which has an action function which takes a T argument and returns void.  The right hand side is the notification that is sent when something happens.  This is all managed by the Observable as it maintains a list of all observers so that it can notify them. 

To make this a little more concrete, let mash these two sets of interfaces together in order to dualize.  First, let’s start with the IEnumerator<T> interface. 

public interface IEnumerator<T> : IDisposable
{
T Current {
get; } // throws exception
bool MoveNext();
}

What we’ll notice is that we have two items the Current property with a get accessor and the MoveNext method which takes nothing and returns a boolean that indicates if it can move next.  You’ll also notice that our Current property also could throw an exception.  If we’re in the Java world, we’d explicitly mark this interface with throws Exception, but as C# doesn’t have that, this comment will do.  Now, let’s look to dualize this interface.

public interface IDualEnumerator<T>
{
void SetCurrent(T value | Exception ex);
void MoveNext(bool canMove);
}

At first glance, all we have to do is flip our Current property to have a setter.  Because the Current yielded a value or threw an exception, that’s the way we’ll model it for now.  Secondly, our MoveNext will now take a boolean to indicate whether we’re done.  Let’s refactor this a little bit more and break the SetCurrent into two distinct operations, one for yielding the next value, and one for throwing an exception. 

public interface IDualEnumerator<T>
{
void Yield(T value);
void Throw(Exception ex);
void MoveNext(bool canMove);
}

Now that we have that under control, let’s then move onto our MoveNext method.  As you’ll notice, we’ll continue to send in a true until we’re finished pushing values.  Instead of calling MoveNext repeatedly, let’s instead call this only once when we’re finished.

public interface IDualEnumerator<T>
{
void Yield(T value);
void Throw(Exception ex);
void Break();
}

What we then end up with is pretty much the signature of our IObserver<T>.  By renaming our Yield to OnNext, our Throw to OnException and Break to OnCompleted, we’re left with our original IObserver<T> interface.

public interface IObserver<T>
{
void OnNext(T value);
void OnError(Exception exception);
void OnCompleted();
}

What about the dual of IEnumerable<T>?  Let’s first look at what that interface is.

public interface IEnumerable<T>
{
IEnumerator
<T> GetEnumerator();
}

This interface has a single method called GetEnumerator which returns an IEnumerator<T>.  How might we dualize this interface?  Well, we already know that the dual of the IEnumerator<T> is IObservable<T>, just as well, we need a way to attach interest in a given dualized Enumerator, so we’ll create a method called Attach.  Just as well, when we attach interest, we also need a way to detach our interest for a given observer.  That might look like the following.

public interface IDualEnumerable<T>
{
void Attach(IObserver<T> observer);
void Detach(IObserver<T> observer);
}

If we look at the Java interfaces for Observer and Observable, these are pretty close in signature.  There ultimately is a problem with this scenario in terms of composition.  At each level, it needs to remember which things maps to which so that you could undo any of the observers if need be.  Not doing so could lead to space leaks and is generally a bad idea.  How could we solve this then?  Ultimately, we need a way to track our attachment once we call the Attach through some form of a return value.  At this point, the tracking is nothing more than needing the ability to detach our given Observer.  If you’ll notice in our IEnumerator<T> interface, it inherits the IDisposable interface which gets one thinking.  Could we instead, change our Attach method so that when we call it, we return an IDisposable instead which gives us the ability to track our subscription and be able to clean up after we’re done?  That’s exactly the approach that the IObservable<T> takes in the final version of the interface.

public interface IObservable<T>
{
IDisposable Subscribe(IObserver
<T> observer);
}

Now that we’ve proven the duality, why does it matter?  Because of this duality, the laws that apply to LINQ to Objects now can apply to LINQ to Observables (or Events).  And in fact, we can see that through the Observable class which has many of our standard LINQ combinators such as:

  • Aggregate
  • First/FirstOrDefault
  • GroupBy
  • Join
  • Last/LastOrDefault
  • Select
  • SelectMany
  • Single/SingleOrDefault
  • Skip/SkipWhile
  • Take/TakeWhile
  • Where
  • Zip

Because of this duality, we can do such simple things as fire the ProgressChanged event until the RunWorkerCompleted event happens.

var worker = new BackgroundWorker();

var percentage = from progress in worker.ProgressChangedEvent()
.Until(worker.RunWorkerCompletedEvent())
select progress.EventArgs.ProgressPercentage;

var subscription = percentage.Subscribe(
p
=> Console.WriteLine("Percentage Complete: {0}", p));

That’s only scratching the surface of the power of the Reactive Framework.  We’ll go in depth in the next part of the series on the combinators.  We could go into the fact that the Enumerable solution is really just the lazy list monad and the Observable solution is really just the continuation monad, but that’s for another time as well.

Conclusion

As I’ve stated before, the LiveLabs Reactive Framework gives us the ability to harness reactive programming and treat events as the first class citizens they should have been using LINQ expressions and other standard LINQ combinators.  Still, there is much to cover in this series with the standard LINQ combinators, the monadic heritage of this solution and more.

5 Comments

Comments have been disabled for this content.