F# First Class Events – Async Workflows + Events Part III
So far in this series, I’ve covered a bit about what first class events are in F# and how you might use them. In the first post, we looked at what a first class events mean and some basic combinators in order to compose events together. In the second post, we looked at how we might create events and publish them to the world through classes. And in the third post I talked about how to manage the lifetime of a subscription. In the fourth installment, I corrected my usage of the old create function and instead to use the Event class to create, trigger and publish events. In the last part, we’ve been talking about asynchronous workflows and eventing together, and this time we’ll pick up on that discussion. Before we get started, let’s get caught up to where we are today.
- Part 1 – First Class Events
- Part 2 – Creating Events
- Part 3 – Creating and Disposing Handlers
- Part 4 – Changes on Creating Events
- Part 5 – Async Workflows + Events Part I
- Part 6 – Async Workflows + Events Part II
Converting Events to Asynchronous Operations
One of the last topics to be covered in this section involves the question, “How do we take an event and properly manage the cancellation and error checking?” Earlier I showed about how we could take the WebClient’s DownloadStringAsync event and partition it into three different events based upon the outcome of the main event. This time, we’re going to take that event and turn it into an asynchronous operation with proper error checking and cancellation checking.
In order to make this happen, we need to use the AwaitEvent extension method on the Async<T> class. Let’s take a first glance at how to do this with a BackgroundWorker class. This class allows us to do potentially expensive operation in the background and report progress along the way. Of interest to us is the RunWorkerCompleted event and the RunWorkerAsync method in trying to wrap that in such a way that we can check for errors and cancellations. First, let’s create an extension method to the BackgroundWorker class to allow for the Async<T> binding.
open System open System.ComponentModel type BackgroundWorker with member this.AsyncRunWorker(?argument:obj) = async { // Run the worker asynchronously let arg = defaultArg argument null this.RunWorkerAsync(arg) // Wait for the event to happen let! args = Async.AwaitEvent(this.RunWorkerCompleted, cancelAction=this.CancelAsync) // Base our result on what happened let result = if args.Cancelled then AsyncCanceled (new OperationCanceledException()) elif args.Error <> null then AsyncException args.Error else AsyncOk args.Result return! AsyncResult.Commit(result) }
What I’ve done is to create an extension method called AsyncRunWorker which has an optional argument to send to the worker. I first call the RunWorkerAsync with our argument (null if omitted), then I wait for the RunWorkerCompleted event to fire with our cancel action. This cancel action allows us to specify some behavior that happens should the operation somehow be canceled. Next, we base our result upon whether we have a cancellation, an error or lastly, we have a result. Then we commit that answer as our result. This function, when called, will produce and Async<obj> as that is the signature of the args.Result.
We can test the behavior of this by creating a simple worker and running it inside of an async workflow. In order to do so, we’ll put in a little bit of infrastructure that we had in a previous post talking about subscribing to events. This way, we can unsubscribe from our event automatically at the end of our async workflow through the use of a using scope.
[<AutoOpen>] module EventExtensions = open System type IDelegateEvent<'Del when 'Del :> Delegate > with member this.Subscribe(d) = this.AddHandler(d) { new IDisposable with member disp.Dispose() = this.RemoveHandler(d) }
After we’ve defined this, let’s go ahead and now define our async workflow with the BackgroundWorker defined. In this instance, we’ll create a simple background process that waits for a little bit and then formats the argument into a string as the result.
let workerResult (worker:BackgroundWorker) = async { use e = worker.DoWork.Subscribe(fun _ args -> Thread.Sleep(5000) args.Result <- sprintf "Hello %A" args.Argument) return! worker.AsyncRunWorker ("Matt") } |> Async.RunSynchronously
In this instance, we created a function which takes a BackgroundWorker and inside the async workflow, we subscribe to the DoWork event to sleep for a bit, and then format a string with the argument. At the end of the async block, I invoke the AsyncRunWorker extension method with my first name as an argument. Finally, I run it synchronously just to force the evaluation. Had I been using .NET 4.0, I could have used a Task<T> instead and invoked it as a future. I’ll cover how that works in a future post.
Now that we have our function defined for handling a BackgroundWorker inside the async workflow, let’s tie it all together. First, we’ll create the worker which supports cancellation. One item of note is that I included below some pragmas for fixing some thread blocking issues when using F# interactive around the use of SynchronizatonContexts and the Async.RunSynchronously. If this code is compiled and run as an executable, then it won’t be affected. I invoke the worker three times and print out the result of each.
let worker = new BackgroundWorker(WorkerSupportsCancellation = true) #if INTERACTIVE let context = SynchronizationContext.Current SynchronizationContext.SetSynchronizationContext(null) #endif workerResult worker |> printfn "%A" workerResult worker |> printfn "%A" workerResult worker |> printfn "%A" #if INTERACTIVE SynchronizationContext.SetSynchronizationContext(context) #endif do ()
It’s an interesting exercise, but how about something a bit more useful?
Extending WebClient
In this case, let’s look at extending the System.Net.WebClient to also support F# async workflows for such things as asynchronously opening a reader of a given URL. If you’ll look at the F# PowerPack library, it already has an extension method for asynchronously downloading a string, such as HTML from a website in WebClient.AsyncDownloadString. For my example, let’s try the same approach from above to apply it to our situation:
open System open System.IO open System.Net type WebClient with member this.AsyncOpenRead (address:Uri) : Async<Stream> = async { let userToken = new obj() this.OpenReadAsync(address, userToken) // Loop until we see a reply with the same token let rec loop() = async { let! args = Async.AwaitEvent(this.OpenReadCompleted, cancelAction=this.CancelAsync) if args.UserState <> userToken then return! loop() else let result = if args.Cancelled then AsyncCanceled (new OperationCanceledException()) elif args.Error <> null then AsyncException args.Error else AsyncOk args.Result return! AsyncResult.Commit(result) } return! loop() }
In this case, we’re going to create a token in which we can track our particular instance of the async operation. We call the OpenReadAsync method with our address and our token, and then we loop until we get the callback with our associated token. Once we do, then we set the result much as we did above. You’ll notice a familiar trend that most of the async operations on the WebClient can be extended the same way this one was, such as OpenWrite, DownloadXXX and UploadXXX.
We can now download data from a reader as we have in this example below:
open System open System.IO open System.Net open System.Threading let readHtml (address : Uri) = async { let wc = new WebClient() use! stream = wc.AsyncOpenRead(address) use reader = new StreamReader(stream) return! reader.AsyncReadToEnd() } #if INTERACTIVE let context = SynchronizationContext.Current SynchronizationContext.SetSynchronizationContext(null) #endif let html = readHtml(Uri "http://bing.com") |> Async.RunSynchronously #if INTERACTIVE SynchronizationContext.SetSynchronizationContext(context) #endif do()
So, as you can see from above, we simply read the HTML from a page using the WebClient approach instead of the WebRequest way using async workflows. Once again, we have to deal with the context issues only during F# Interactive.
Conclusion
From these two examples, we can see how you can take programming models built around events and turn them into async functions that can easily interact with F#’s async workflows. I think over this past series we’ve covered quite a bit dealing with first class events in F#. In the next series, we’ll start to look at the Reactive Framework and new ways of thinking around handling events.