Sliding Windows via the Reactive Framework
A few months ago, playing with CTP 2 of StreamInsight, I created a small VWAP demo on a sliding window. Now that a proper CTP of the RX is available, I wanted to see how much effort the same demo would be without the CEP infrastructure of StreamInsight. I’ll admit that this was a little bit harder to write then I expected – and there’s still at least one bug remaining (updated) , but the code for actually computing the VWAPS feels much cleaner in the RX version then it did in the StreamInsight version. The debugability (which is really about transparency) of RX is a welcome difference to most CEP systems.
So here’s the code:
The generation of stock ticks remained nearly identical – however instead of timestamping by hand, I used to Timestamp() extension method. And to allow multiple observers to the same IObservable, the ticks are routed to a Subject.
public IObservable<Timestamped<StockTick>> GetTicks() { var subj = new Subject<Timestamped<StockTick>>(); var gen = Observable.Generate( 0 , ii => ii < 1000 // produce 1000 ticks , ii => new StockTick() // next value ... ) .Timestamp(); gen.Subscribe(tsst => subj.OnNext(tsst)); return subj; }
Compute VWAP on a 10 second sliding window
public IObservable<VWAPItem> GetVWAPWS(IObservable<Timestamped<StockTick>> oticks) { return oticks .ToSlidingWindow(new TimeSpan(0, 0, 0,10), new TimeSpan(0, 0, 0, 0, 500)) .Select(sl => sl.Current) .SelectMany(cur => cur.GroupBy(tsst => tsst.Value.Symbol) .Select(grp => { IEnumerable<StockTick> ticks = grp.Select(tsst2 => tsst2.Value); var totalAmount = ticks.Sum(tk => tk.Size * tk.Price); var totalVolume = ticks.Sum(tk => tk.Size); return new VWAPItem(grp.Key, totalAmount, totalVolume, totalAmount / totalVolume); })); }
And the code for ToSlidingWindow()
public static IObservable<SlidingWindow<Timestamped<T>>> ToSlidingWindow<T>( this IObservable<Timestamped<T>> source, TimeSpan size, TimeSpan resolution) { Func<SlidingWindow<Timestamped<T>>, TimeoutJoinItem<T>, SlidingWindow<Timestamped<T>>> windowing = (window, item) => { Func<Timestamped<T>, bool> checkTimestamp = cwi => cwi.Timestamp.Add(size) <= item.ComparisonTimestamp; var newCurrent = window.Current.SkipWhile(checkTimestamp); var removed = window.Current.TakeWhile(checkTimestamp); var added = Enumerable.Repeat(item.TSItem, (item.IsTimeout) ? 0 : 1); return new SlidingWindow<Timestamped<T>>(newCurrent.Concat(added), added, removed); }; DateTime priorleft = DateTime.MinValue; return source.CombineLatest(Observable.Timer(resolution, resolution).Timestamp(), (left, right) => { bool isTimeout = left.Timestamp == priorleft; priorleft = left.Timestamp; return new TimeoutJoinItem<T>(left, (isTimeout)? right.Timestamp: left.Timestamp, isTimeout); }).Scan(new SlidingWindow<Timestamped<T>>(), windowing) .Where(sl => sl.Added.Count() > 0 || sl.Removed.Count() > 0); }
The key elements in the above are
- Observable.Timer – this is our heartbeat which allows us to detect passage of time without new events
- CombineLatest – Join two IObservables – the data stream and the time stream
- Scan – this is Accumulate() for Observables – the windowing function takes the current Window and computes the new windows based on which elements have expired and been added
- And finally reduce noise by removing SlidingWindows which have not changed
The code to wire it up to a windows is standard stuff, just
var ticks = _model.GetTicks(); ticks .ObserveOnDispatcher() .Subscribe(tst => TickCollection.Add(tst)); var vwapDict= new Dictionary<string,VWAPItem>(); _model.GetVWAPWS(ticks) .ObserveOnDispatcher() .Subscribe(vwap => { if (vwapDict.ContainsKey(vwap.Symbol)) VWAPCollection.Remove(vwapDict[vwap.Symbol]); vwapDict[vwap.Symbol] = vwap; VWAPCollection.Add(vwap); });
And of course the required screenshot