Introduction to the Reactive Framework Part V
In the previous post of the Introduction to the Reactive Framework series, we covered how to create new IObservable<T> instances, either from scratch or from existing sequences. What this allowed us to do was turn an operation which was previously interactive, such as iterating over a collection, to a reactive, where I could have multiple listeners reacting to the same collection concurrently. This has nice implications for allowing us to scale our application in interesting ways. This time, let’s take another angle on making asynchronous programming easier.
Let’s get caught up to where we are today:
- Part I – Introduction
- Part II – Duality of Enumerable/Observable
- Part III – From Events to Observables
- Part IV – From Enumerables to Observables
From Asynchronous To Observables
In the first post in the series, we talked about some of the problems we face with asynchronous programming today. For example, for any given operation, how do we handle exceptions, or handle cancellation, or even manage the lifetime of resources? Let’s take a look at the story of asynchronous programming today. For example, let’s take the fairly straight forward approach of sending a tweet in Twitter, but with the added pressure of non-blocking calls for all IO operations. Now we took an example which should have been straight forward, and now it looks something like this anti-pattern:
var request = WebRequest.Create("http://twitter.com/statuses/update.xml"); request.Credentials = new NetworkCredential("foo", "bar"); request.Method = "POST"; request.BeginGetRequestStream(rsAr => { var requestStream = request.EndGetRequestStream(rsAr); var tweet = HttpUtility.HtmlEncode("status=Hello from an async method #bacon"); var bytes = Encoding.UTF8.GetBytes(tweet); requestStream.BeginWrite(bytes, 0, bytes.Length, beginWriteAr => { requestStream.EndWrite(beginWriteAr); request.BeginGetResponse(responseAr => { var response = request.EndGetResponse(responseAr); var responseStream = response.GetResponseStream(); var readBuffer = new byte[response.ContentLength]; responseStream.BeginRead(readBuffer, 0, readBuffer.Length, readAr => { var read = responseStream.EndRead(readAr); var readText = Encoding.UTF8.GetString(readBuffer); ResultsText.Text = readText; }, null); }, null); }, null); }, null);
Not only is this code an absolute mess, but it has a number of bad practices, including not handling cleanup of our resources, as well as not handling exceptions, cancellation checking, or even checking if our buffer was completely read. Adding those constraints to the above code would make it a bit more unreadable. The situation we run into is that programming in a non-blocking fashion has caused all sorts of heartache where we become so wrapped up in the non-blocking code that we lose sight easily of the domain logic we should have been interested in, in the first place.
I’ve covered in the past that with F# async workflows, solving this is a rather easy task and that kind of rich functionality has been unavailable to C# developers directly. But, with the Reactive Extensions for .NET, we have the ability to also take advantage of composable asynchronous actions as well. So, where do we get started?
Let’s take a simple example of creating a function and then invoking it asynchronously.
Func<int, int, int> add = (x, y) => x + y; add.BeginInvoke(3, 4, ar => { var result = add.EndInvoke(ar); Console.WriteLine(result); }, null);
Many asynchronous methods in .NET are written in this way that their signature has a BeginXXX and EndXXX where the XXX is the method name that is being executed asynchronously. The Begin takes the arguments to execute the method, an AsyncCallback which is an action that takes an IAsyncResult and returns nothing, and finally object state. The End method takes the IAsyncResult which is passed in from the AsyncCallback in order to retrieve the value of the asynchronous call.
void BeginXXX(args, AsyncCallback, state); Value EndXXX(IAsyncResult);
Luckily, the Reactive Extensions give us a couple of ways to make this happen. First, let’s take an existing method such as the System.IO.Stream.WriteAsync. In this example, we’ll create an extension method which allows us to turn this asynchronous method into a IObservable<T>.
public static IObservable<Unit> WriteAsync( this Stream stream, byte[] buffer, int offset, int count) { return Observable.FromAsyncPattern<byte[], int, int>( stream.BeginWrite, stream.EndWrite)(buffer, offset, count); }
Because the Write method returns void, we have issues in C# as we cannot have a generic type as being void. To get around this, much like F#, the Reactive Extensions introduced the Unit type to express that we have no value. Quite frankly this is the way I think C# should have been to allow for this and not need these unnecessary hacks. Anyways, the Observable class has a method called FromAsyncPattern which takes in both the BeginXXX and EndXXX and returns a Func delegate that takes the arguments required to execute it. In this case, the Observable.FromAsyncPattern returns a Func which takes the buffer, offset and count and returns an IObservable<Unit>.
There is also another way in that we can take our existing add function from above and using an extension method called ToAsync, can also work the IObservable magic.
Func<int, int, int> add = (x, y) => x + y; Func<int, int, IObservable<int>> obvervableAdd = add.ToAsync(); IObservable<int> result = from added in obvervableAdd(3, 4) select added; IDisposable sub = result .SubscribeOnDispatcher() .Subscribe(added => Results.Text = added.ToString());
In this instance, we took our existing add method and then turned it into an asynchronous method by using the ToAsync extension method. That returns a Func delegate which takes our two arguments and returns an IObservable<int> which contains our asynchronous result. We can then take our new observableAdd and execute it inside of a LINQ statement, then subscribe to it on the Dispatcher thread in order to rid ourselves of synchronization context issues in WPF.
Going back to our original example here, let’s see how we might follow some of the ideas presented here into creating a way to send a tweet completely in a LINQ statement.
var tweetObservable = from request in CreateWebRequest( "http://twitter.com/statuses/update.xml", userName, password, "POST") let tweet = HttpUtility.HtmlEncode("status= Hi! #bacon") let buffer = Encoding.UTF8.GetBytes(tweet) from _ in request.SetContentLength(buffer.Length) from requestStream in request.GetRequestStreamAsync() from __ in requestStream.WriteAsync(buffer, 0, buffer.Length) from response in request.GetResponseAsync() let responseStream = response.GetResponseStream() let reader = new StreamReader(responseStream) select reader.ReadToEnd(); var tweetsObservable = tweetObservable .Throttle(TimeSpan.FromMinutes(3)) .Repeat(25);
We can create extension methods to the WebRequest for GetRequestStream and GetResponse to enable observable behavior via the Begin/End methods that the WebRequest exposes. As well, we can extend the System.IO.Stream as well to both write and read streams asynchronously. We have other more trivial methods such as setting the content length and creating a web request as an observable as well, and in order to do that was covered in the previous post using the Observable.Create method. After we create this IObservable value, we can then throttle it to every three minutes and to repeat 25 times. One concern we might have is around the resource usage and ensuring the cleanup, and we’ll cover that next time.
Conclusion
As I’ve stated before, the LiveLabs Reactive Framework gives us the ability to harness reactive programming and treat events as the first class citizens they should have been using LINQ expressions and other standard LINQ combinators. Still, there is much to cover in this series with the standard LINQ combinators, the monadic heritage of this solution as well as managing resources, and turning Tasks Observables.