Implementing a custom RSS/Atom adapter with Microsoft StreamInsight
In a previous post we explained the programming model of Microsoft's StreamInsight adapter framework. The fundamental capability of this framework is to streamline the flow of events in and out of the StreamInsight hosting application. One of the main advantages of this model is that enables developers to create their own adapters that can be leveraged on StreamInsight-based solutions. On this post we will explore the details of implementing an RSS/Atom adapter using StreamInsight's adapter framework.
Syndication formats such as Atom and RSS have been on the front of the Web 2.0 movement. Nowadays, more and more systems are choosing these formats as a mechanisms to expose data to external applications. The initial use of RSS and Atom was mainly focused on read-only scenarios. However, the emergence of standards such as the Atom Publishing Protocol enable applications to publish syndication feeds to specific endpoint.
From the Complex Event Processing (CEP) perspective, syndication formats can be an interesting mechanism for both accessing and publishing events processed in a CEP application. If you live in the CEP world, the idea of using text-based data sources such as web pages or flat file should be pretty familiar to you. Recently we have seen some of the CEP market leaders to start embracing syndication formats as a mechanism for represent events. As an example, StreamBase (currently, one of the top CEP vendors in the market) recently announced it support for processing Twitter streams as part of CEP applications.
For the simplicity of this example we decided to implement a input-only adapter that reads an RSS feed and generates a set of events. The first step in our implementation would be to define the event types that describe the syndication items. We can achieve that by using the class definition illustrated in the following code.
1: public class SyndicationEvent
2: {
3: private string id;
4: private string title;
5: private string summary;
6: private string text;
7: private string uri;
8: private DateTime createdTime;
9: private DateTime lastUpdatedTime;
10: private string author;
11:
12: public SyndicationEvent(SyndicationItem item)
13: {
14: id= item.Id;
15: title= item.Title.Text;
16: if (item.Summary.Text.Length > 256)
17: summary = item.Summary.Text.Substring(0, 256);
18: else
19: summary = item.Summary.Text;
20:
21: createdTime= item.PublishDate.DateTime;
22: lastUpdatedTime= item.LastUpdatedTime.DateTime;
23: uri= item.Links[0].ToString();
24: if (item.Authors.Count > 0)
25: author= item.Authors[0].Name;
26: }
27:
28: public SyndicationEvent()
29: { }
30:
31: public string ID
32: {
33: get { return id; }
34: set { id = value; }
35: }
36:
37: public string Title
38: {
39: get { return title; }
40: set { title = value; }
41: }
42:
43: public string Summary
44: {
45: get { return summary; }
46: set { summary = value; }
47: }
48:
49: public string Text
50: {
51: get { return text; }
52: set { text = value; }
53: }
54:
55: public string Uri
56: {
57: get { return uri; }
58: set { uri = value; }
59: }
60:
61: public string Author
62: {
63: get { return author; }
64: set { author = value; }
65: }
66:
67: public DateTime CreatedTime
68: {
69: get { return createdTime; }
70: set { createdTime = value; }
71: }
72:
73: public DateTime LastUpdatedTime
74: {
75: get { return lastUpdatedTime; }
76: set { lastUpdatedTime = value; }
77: }
78:
79: }
As you might have noticed, our sample syndication event is initialized directly from a System.ServiceModel.Syndication.SyndicationItem object.
Given the time-based nature of syndication feeds, we decided to implement our sample adapter as an interval adapter by deriving from the TypedIntervalInputAdapter<T> class. Basically, an interval adapter process events valid during a specific time interval. You can find more details in my previous post.
For the sake of this example, our adapter polls the syndication URI when initialized. In order to implement this pattern, we leverage the programming model included in the System.ServiceModel.Syndication namespace as illustrated in the following code.
1: public class RSS20InputAdapter : TypedIntervalInputAdapter<SyndicationEvent>
2: {
3: private IntervalEvent<SyndicationEvent> pendingevent;
4: private DateTime pendingctitime;
5:
6: SyndicationFeed dataFeed;
7: IEnumerator<SyndicationItem> dataFeedItems;
8:
9: int index = 0;
10:
11: public RSS20InputAdapter(SyndicationAdapterConfig configInfo,
12: CepEventType cepEventType)
13: {
14: Rss20FeedFormatter rssFormatter = new Rss20FeedFormatter
15: (typeof(SyndicationFeed));
16: XmlReader rssReader = XmlReader.Create(configInfo.URL);
17: rssFormatter.ReadFrom(rssReader);
18: rssReader.Close();
19: dataFeed = rssFormatter.Feed;
20: dataFeedItems= dataFeed.Items.GetEnumerator();
21: }
22:
23: ....
24: }
The StreamInsight runtime will invoke the Start operation of the adapter in order to start listening for events.
1: public override void Start()
2: {
3: ProduceEvents();
4: }
5:
6: private void ProduceEvents()
7: {
8: IntervalEvent<SyndicationEvent> currevent=
9: default(IntervalEvent<SyndicationEvent>);
10: DateTime currctitime = default(DateTime);
11: EnqueueOperationResult result = EnqueueOperationResult.Full;
12:
13: while (true)
14: {
15: if (AdapterState.Stopping == AdapterState)
16: {
17: this.Stopped();
18: return;
19: }
20:
21: currevent = CreateEventFromSource();
22: pendingevent = null;
23:
24: // Enqueue point event with payload.
25: if (null != currevent)
26: result = Enqueue(ref currevent);
27: else
28: {
29: result = EnqueueOperationResult.Full;
30: PrepareToStop(currevent);
31: }
32:
33: // Handle Enqueue rejection
34: if (EnqueueOperationResult.Full == result)
35: {
36: EnqueueCtiEvent(DateTime.MaxValue);
37: PrepareToStop(currevent);
38: Stopped();
39: return;
40:
41: }
42: // Enqueue Cti event based on application logic
43: result = EnqueueCtiEvent(currctitime);
44: }
45: }
Notice that the adapter will queue the events corresponding to the different syndication items until there are no more syndication entries available. After that, the adapter inserts a CTI event indicating that no subsequent INSERT events can have a start time earlier than the timestamp of the CTI event.
The ProduceEvents operation is also invoked as part of the Resume method.
1: public override void Resume()
2: {
3: ProduceEvents();
4: }
The rest of the implementation follows a similar pattern to most adapters and it fundamentally controls the state lifecycle of the adapter. The following code illustrates the complete implementation of our sample RSS adapter.
1: public class RSS20InputAdapter : TypedIntervalInputAdapter<SyndicationEvent>
2: {
3: private IntervalEvent<SyndicationEvent> pendingevent;
4: private DateTime pendingctitime;
5:
6: SyndicationFeed dataFeed;
7: IEnumerator<SyndicationItem> dataFeedItems;
8:
9: int index = 0;
10: /// <summary>
11: /// Constructor - Use this to initialize local resources based on configInfo
12: /// structures. Examples are open files or database connections, set delimiters, and
13: /// other parameters that enable the adapter to access the input/output device.
14: /// </summary>
15: public RSS20InputAdapter(SyndicationAdapterConfig configInfo, CepEventType cepEventType)
16: {
17: Rss20FeedFormatter rssFormatter = new Rss20FeedFormatter(typeof(SyndicationFeed));
18: XmlReader rssReader = XmlReader.Create(configInfo.URL);
19: rssFormatter.ReadFrom(rssReader);
20: rssReader.Close();
21: dataFeed = rssFormatter.Feed;
22: dataFeedItems= dataFeed.Items.GetEnumerator();
23: }
24:
25: /// <summary>
26: /// Start is the first method to be called by the CEP server once the input
27: /// adapter has been instantiated. So any initializations that cannot be covered
28: /// in the constructor can be placed here. Start() is called in a separate worker
29: /// thread initiated by the server. In this example (and in most scenarios), Start
30: /// begins producing events once it is called.
31: /// </summary>
32: public override void Start()
33: {
34: ProduceEvents();
35: }
36:
37: /// <summary>
38: /// Resume is called by the server once it returns from being scheduled away
39: /// from Start, and only after the adapter has called Ready(). Resume continues
40: /// to produce events.
41: /// </summary>
42: public override void Resume()
43: {
44:
45: ProduceEvents();
46: }
47:
48:
49:
50: /// <summary>
51: /// Dispose is inherited from the base adapter class and is the placeholder to
52: /// release adapter resources when this instance is shut down.
53: /// </summary>
54: /// <param name="disposing"></param>
55: protected override void Dispose(bool disposing)
56: {
57: }
58:
59: /// <summary>
60: /// Main driver to read events from the source and enqueue them.
61: /// </summary>
62: private void ProduceEvents()
63: {
64: IntervalEvent<SyndicationEvent> currevent = default(IntervalEvent<SyndicationEvent>);
65: DateTime currctitime = default(DateTime);
66: EnqueueOperationResult result = EnqueueOperationResult.Full;
67:
68: while (true)
69: {
70: if (AdapterState.Stopping == AdapterState)
71: {
72: this.Stopped();
73: return;
74: }
75:
76: currevent = CreateEventFromSource();
77: pendingevent = null;
78:
79: // Enqueue point event with payload.
80: if (null != currevent)
81: result = Enqueue(ref currevent);
82: else
83: {
84: result = EnqueueOperationResult.Full;
85: PrepareToStop(currevent);
86: // Stopped();
87: }
88:
89: // Handle Enqueue rejection
90: if (EnqueueOperationResult.Full == result)
91: {
92: EnqueueCtiEvent(DateTime.MaxValue);
93: PrepareToStop(currevent);
94: Stopped();
95: return;
96:
97: }
98:
99: // Enqueue Cti event based on application logic
100: result = EnqueueCtiEvent(currctitime);
101:
102: }
103: }
104:
105: private void PrepareToStop(IntervalEvent<SyndicationEvent> currEvent)
106: {
107: // The server will not accept any more events, and you
108: // cannot do anything about it. Release the event.
109: // If you miss this step, server memory will leak.
110: if (null != currEvent)
111: {
112: ReleaseEvent(ref currEvent);
113: }
114: }
115:
116: private void PrepareToResume(IntervalEvent<SyndicationEvent> currevent)
117: {
118: pendingevent = currevent;
119: }
120:
121: private void PrepareToResume(DateTime currctitime)
122: {
123: pendingctitime = currctitime;
124: }
125: private bool EndofSource()
126: {
127: return false;
128: }
129: private IntervalEvent<SyndicationEvent> CreateEventFromSource()
130: {
131: if (dataFeedItems.MoveNext())
132: {
133: IntervalEvent<SyndicationEvent> syndicationEvent = this.CreateInsertEvent();
134: syndicationEvent.Payload = new SyndicationEvent(dataFeedItems.Current);
135: syndicationEvent.StartTime = dataFeedItems.Current.PublishDate.DateTime;
136: syndicationEvent.EndTime = dataFeedItems.Current.PublishDate.DateTime;
137: return syndicationEvent;
138: }
139: else
140: return null;
141: }
142: }
143:
144: // Configuration structure to initialize the adapter.
145: public struct SyndicationAdapterConfig
146: {
147: private string url;
148:
149: public string URL
150: {
151: get { return url; }
152: set { url = value; }
153: }
154: }
155:
156: // Factory class is the entry point for the query binder to initialize
157: // and create an adapter instance.
158: public class SyndicationAdapterFactory : IInputAdapterFactory<SyndicationAdapterConfig>
159: {
160: public SyndicationAdapterFactory()
161: {
162: }
163:
164: public InputAdapterBase Create(SyndicationAdapterConfig configInfo,
165: EventShape eventshape, CepEventType cepeventtype)
166: {
167: InputAdapterBase adapter = default(InputAdapterBase);
168:
169: if (EventShape.Interval == eventshape)
170: adapter = new RSS20InputAdapter(configInfo, cepeventtype);
171:
172: return adapter;
173: }
174: public void Dispose()
175: {
176: }
177: }
Although the adapter is fully functional we still need to enable the factory mechanism that StreamInsight applications will use to create instances of the adapter. We can accomplish this by implementing a custom adapter factory that inherits from the Microsoft.ComplexEventProcessing.Adapters.IInputAdapterFactory<T> class.
1: public class SyndicationAdapterFactory :
2: IInputAdapterFactory<SyndicationAdapterConfig>
3: {
4: public SyndicationAdapterFactory()
5: {
6: }
7:
8: public InputAdapterBase Create(SyndicationAdapterConfig configInfo,
9: EventShape eventshape, CepEventType cepeventtype)
10: {
11: InputAdapterBase adapter = default(InputAdapterBase);
12:
13: if (EventShape.Interval == eventshape)
14: adapter = new RSS20InputAdapter(configInfo, cepeventtype);
15:
16: return adapter;
17: }
18: public void Dispose()
19: {
20: }
21: }
At this point, our sample adapter can be used from any StreamInsight application. For instance, the following code illustrates a sample application that uses our adapter to query the Microsoft's Bing News feed filtering for items containing a specific keyword. The events that match the filter criteria are output it to the output console using the TextFileOutputAdapter sample included in the StreamInsight SDK.
1: private static void RssAdapterTest(string keyword)
2: {
3: Server cepServer = Server.Create();
4: Application cepApp = cepServer.CreateApplication("SampleCEPApplication");
5: InputAdapter syndicationInputAdapter =
6: cepApp.CreateInputAdapter<SyndicationAdapterFactory>("sampleadapter", "");
7: OutputAdapter fileOutputAdapter=
8: cepApp.CreateOutputAdapter<TextFileOutputFactory>("sampleadapter", "");
9: CepEventType sensorEventType = cepApp.CreateEventType<SyndicationEvent>();
10:
11: CepStream<SyndicationEvent> dataFeedStream=
12: CepStream.CreateInputStream<SyndicationEvent>("datafeedstream");
13:
14: var query = from s1 in dataFeedStream
15: where s1.Title.Contains(keyword)
16: select new { Title = s1.Title, Time = s1.LastUpdatedTime,
17: Summary= s1.Summary};
18:
19: QueryTemplate template = cepApp.CreateQueryTemplate("samplequery", query);
20: QueryBinder binder = new QueryBinder(template);
21: var dataFeedInputConf = new SyndicationAdapterConfig
22: {
23: URL = "http://www.bing.com/news?FORM=Z9LH6&format=rss"
24: };
25:
26:
27: var outputConfig = new TextFileOutputConfig
28: {
29: OutputFileName = String.Empty,
30: Delimiter = '\t',
31: AdapterStopSignal = "StopAdapter"
32: };
33:
34: binder.BindInputStream<SensorReading, SyndicationAdapterConfig>("datafeedstream",
35: syndicationInputAdapter, EventShape.Interval, dataFeedInputConf);
36: binder.AddQueryConsumer<TextFileOutputConfig>("queryresult",
37: fileOutputAdapter, outputConfig, EventShape.Interval,
38: StreamEventOrder.FullyOrdered);
39:
40: Query cepQuery = cepApp.CreateQuery("samplequery", binder, "");
41: EventWaitHandle adapterStopSignal = new EventWaitHandle(false,
42: EventResetMode.ManualReset, "StopAdapter");
43: cepQuery.Start();
44:
45: adapterStopSignal.WaitOne();
46: Console.ReadLine();
47: cepQuery.Stop();
48:
49: }