Introduction to the Reactive Extensions for JavaScript – Creating Observers
Looking back to the previous post, we covered how we create observable sequences, the producers of our data. We have quite a number of ways of creating these outside of events which we covered earlier. Now that we have these observable sequences, now what? We need to address the consumer side of this producer/consumer story in the form of an observer.
Before we get started, let’s get caught up to where we are today:
Creating Observers
Let’s go back to the Observer pattern definition once again before we get started. The idea here is that we have an object, called the Observable (or Subject) which keeps a list of its dependents, the observers, and notifies each of them automatically of any state changes. In the case of the Reactive Extensions for JavaScript, we’re talking more about observable sequences. As we discussed last time, the Observer has three parts:
- OnNext – when a new value is produced
- OnError – when an exception occurs
- OnCompleted – when the observable sequence terminates
When creating an observer, we should take all three into account and how we’re going to handle them.
In order to attach these observers to our observable sequence, we can invoke the Subscribe method on our observable while passing in our observer. And when we’re no longer interested in the subscription to the observable sequence, we can detach by calling Dispose on the result of the Subscribe method.
New Observer via Create
Let’s get started in creating an Observer by looking at the Observer.Create method. This method takes in three functions, one for the OnNext, one for the OnError and finally one for the OnCompleted. This function returns to us an Observer which we can then use for subscribing.
Rx.Observer.Create( function(next) { ... }, // OnNext function(err) { ... }, // OnError function() { ... }) // OnCompleted );
Once we have an Observer, we can then attach to the Observable using the Subscribe method which takes our Observer. When we call Subscribe, we get back a disposable object with a single Dispose method which allows us to detach from the Observable.
Observable { Subscribe : function(observer) { ... } }
One of the best ways I find to explore a new API is to write tests to show the expected behavior. By writing these, I have a comprehensive view of what each method does, especially if the code didn’t come with the tests already. So, let’s create a few tests to show the behavior of creating an Observer and then subscribing to an Observable sequence. I’ll use QUnit to write my tests, and in particular, the asynchronous test feature due because we are testing asynchronous callbacks.
The first test will be to check the OnNext function parameter on Observable.Create. In this case, I’ll assert at the single value in my observable sequence is the value I receive when OnNext is invoked.
asyncTest("Observer should observe OnNext", function() { var observable = Rx.Observable.Return(0); var observer = Rx.Observer.Create( function(next) { equals(0, next); start(); }, function(err) { }, function() {}); observable.Subscribe(observer); });
The next test, I will make an example on how my OnError function parameter will work. In this case, I’ll have an Observer throw an exception via the Throw method and my OnError function check the message and assert that it’s the same as my error that I threw.
asyncTest("Observer should observe OnError", function() { var someError = "FAIL!"; var observable = Rx.Observable.Throw(someError); var observer = Rx.Observer.Create( function(next) { }, function(err) { equals(someError, err); start(); }, function() {}); observable.Subscribe(observer); });
Finally, in my last example, let’s create a simple test to show off the OnCompleted behavior. In order to do so we’ll create an empty observable which should not yield any values and instead only invoke the OnCompleted. Then we’ll create an Observer which has the test logic in the OnCompleted function parameter.
asyncTest("Observer should observe OnCompleted", function() { var observable = Rx.Observable.Empty(); var observer = Rx.Observer.Create( function(next) { }, function(err) { }, function() { ok(true, "True when invoked on complete"); start(); }); observable.Subscribe(observer); });
Creating Observers this way is good for reusability, especially if you wish to attach to any number of observable sequences. But, we’re not tied to having to create them via Create, there are other ways.
Overloading Subscribe
In addition to creating an Observer via the Create method, we also have shortcuts which allow us to create an Observer on the fly with the Subscribe method. In addition to the Subscribe which takes an Observer, we have three other overloads which can take functions for our OnNext, OnError and OnCompleted. The first overload takes a function for OnNext, where the next takes a function for OnNext and OnError, and finally the last overload takes functions for all three, the OnNext, OnError and OnCompleted.
Observable { Subscribe : function( function(next) { ... }) Subscribe : function( function(next) { ... }, function(err) { ... }) Subscribe : function( function(next) { ... }, function(err) { ... }, function() { ... }) }
This function, just as above, will create a disposable object for us which allows us to unsubscribe at any time via the Dispose method.
Unsubscribing
As I’ve stated earlier, one of the great things about the design of Rx for JavaScript is that it’s quite easy to both subscribe and unsubscribe from an observer. The design of Rx for JavaScript follows very closely to the design of Rx for .NET including subscribing and unsubscribing. Let’s step through an example of how we can use the Dispose method on our subscription. In this instance, we’ll have two observers, and after the first value has been produced, we unhook the first observer and continue listening on the second. We’ll assert that indeed the first has been unhooked while the second continues to listen.
asyncTest("Dispose should unhook observer", function() { var nextValue = 0; var observable = Rx.Observable.FromArray([1,2,3]); var disp1 = observable.Subscribe( function(next) { nextValue = next; }); var disp2 = observable.Subscribe( function(next) { disp1.Dispose(); equals(1, nextValue); start(); }); });
Such scenarios could be quite helpful in unhooking events when others happen, such as mouse events, keyboard or even AJAX requests. We’ll cover some of those scenarios in upcoming posts.
Conclusion
So, now we’ve covered the basics of creating Observable sequences and Observers and subscriptions. Now that we have some of the basics what else can we do with it? That’s where some of the LINQ combinators come in handy and we’ll pick that up next time.
This of course is only scratching the surface of what capabilities this library has and there is much more yet left to cover. The question you’re probably asking now is where can I get it? Well, for that you’ll have to stay tuned. I hope to have more announcements soon about its general availability.
What can I say? I love JavaScript and very much looking forward to the upcoming JSConf 2010 here in Washington, DC where the Reactive Extensions for JavaScript will be shown in its full glory with Jeffrey Van Gogh. For too many times, we’ve looked for the abstractions over the natural language of the web (HTML, CSS and JavaScript) and created monstrosities instead of embracing the web for what it is. With libraries such as jQuery and indeed the Reactive Extensions for JavaScript gives us better tools for dealing with the troubled child that is DOM manipulation and especially events.