CEP Style Sliding windows in the RX – Take 2
The bug I mentioned in my first attempt at a sliding window was the minor issue that the aggegates never went down to 0, even if the window had emptied out.
The problem line of code was cur.GroupBy(tsst => tsst.Value.Symbol) – if the window is empty, there is nothing to group – and as a result the aggregates don’t get computed.
Here’s the fix:
public IObservable<VWAPItem> GetVWAPWS(IObservable<Timestamped<StockTick>> oticks) { var existingWindows = new ConcurrentDictionary<string,int>(); return oticks .ToSlidingWindow(new TimeSpan(0, 0, 0, 30), new TimeSpan(0, 0, 0, 0, 500)) .Select(sl => sl.Current) .SelectMany(cur => { IEnumerable<VWAPItem> grouped = cur.GroupBy(tsst => tsst.Value.Symbol) .Select(grp => { IEnumerable<StockTick> ticks = grp.Select(tsst2 => tsst2.Value); var totalAmount = ticks.Sum(tk => tk.Size * tk.Price); var totalVolume = ticks.Sum(tk => tk.Size); return new VWAPItem(grp.Key, totalAmount, totalVolume, totalAmount / totalVolume); }); foreach (var grpd in grouped) { existingWindows[grpd.Symbol] = 1; } IEnumerable< IEnumerable<VWAPItem>> outerJoin = existingWindows .GroupJoin(grouped, key => key.Key, grped => grped.Symbol, (key, item) => item.DefaultIfEmpty(new VWAPItem(key.Key, 0, 0, 0))); return outerJoin.SelectMany(x => x); }); }