Exploring the Reactive Framework (RX)
A few days ago, intentionally or not, a version of the Reactive Framework was released into the wild. Let’s see how we can use the RX for computations on a stream of data. As an example we’ll take a stream of ints and produce the averages in groups of five.
Here’s the primary stream of numbers, using the static Generate() method
Random rnd = new Random(); var generatedNums = Observable.Generate<int,int>( 0 // seed , d => true // condition to continue , d => d % 12 //generated value , d => (int)(rnd.NextDouble() * 300) //delay , d => d + 1 // modify value for next iter );
And to consume the stream by adding the values into an ObservableCollection
generatedNums .Post(sc) // move onto UI thread .Subscribe(num => Numbers.Add(num) // add numbers to observable collection );Computing the average, in groups of 5 turns out to be harder, as the Reactive FX doesn’t seem to have a GroupBy() method at this time. Here’s what I came up with:
generatedNums .Grouper(a => a, (a,list) => list.Count() < 5) // group into lists of 5, returning an IObservable<IEnumerable<int>> .Select(list => list.Average()) // take the average of the list, so project IObservable<IEnumerable<int>> to IObservable<int> .Post(sc).Subscribe(mean => Averages.Add(mean) // move onto UI and add to observable collection );And the implementation for “Grouper()”
public static IObservable<IEnumerable<TResult>> Grouper<TSource, TResult>( this IObservable<TSource> source, Func<TSource, TResult> selector , Func<TSource, IEnumerable<TResult>, bool> grouper) { return new AnonymousObservable<IEnumerable<TResult>>( observer => source.Subscribe(x => { try { using (var er = source.GetEnumerator()) while (er.MoveNext()) { bool needsMove = false; var res = new List<TResult>(); while (grouper(er.Current, res) && ((needsMove) ? er.MoveNext() : true)) { needsMove = true; res.Add(selector(er.Current)); } observer.OnNext(res); } } catch (Exception exception) { observer.OnError(exception); return; } }, new Action<Exception>(observer.OnError), new Action(observer.OnCompleted))); }