Implementing a custom RSS/Atom adapter with Microsoft StreamInsight

In a previous post we explained the programming model of Microsoft's StreamInsight adapter framework. The fundamental capability of this framework is to streamline the flow of events in and out of the StreamInsight hosting application. One of the main advantages of this model is that enables developers to create their own adapters that can be leveraged on StreamInsight-based solutions. On this post we will explore the details of implementing an RSS/Atom adapter using StreamInsight's adapter framework.

Syndication formats such as Atom and RSS have been on the front of the Web 2.0 movement. Nowadays, more and more systems are choosing these formats as a mechanisms to expose data to external applications. The initial use of RSS and Atom was mainly focused on read-only scenarios. However, the emergence of standards such as the Atom Publishing Protocol enable applications to publish syndication feeds to specific endpoint.

From the Complex Event Processing (CEP) perspective, syndication formats can be an interesting mechanism for both accessing and publishing events processed in a CEP application. If you live in the CEP world, the idea of using text-based data sources such as web pages or flat file should be pretty familiar to you. Recently we have seen some of the CEP market leaders to start embracing syndication formats as a mechanism for represent events. As an example, StreamBase (currently, one of the top CEP vendors in the market) recently announced it support for processing Twitter streams as part of CEP applications.

For the simplicity of this example we decided to implement a input-only adapter that reads an RSS feed and generates a set of events. The first step in our implementation would be to define the event types that describe the syndication items. We can achieve that by using the class definition illustrated in the following code.

   1: public class SyndicationEvent
   2:   {
   3:       private string id;
   4:       private string title;
   5:       private string summary;
   6:       private string text;
   7:       private string uri;
   8:       private DateTime createdTime;
   9:       private DateTime lastUpdatedTime;
  10:       private string author;
  11:  
  12:       public SyndicationEvent(SyndicationItem item)
  13:       {
  14:           id= item.Id;
  15:           title= item.Title.Text;
  16:           if (item.Summary.Text.Length > 256)
  17:               summary = item.Summary.Text.Substring(0, 256);
  18:           else
  19:               summary = item.Summary.Text;
  20:         
  21:           createdTime= item.PublishDate.DateTime;
  22:           lastUpdatedTime= item.LastUpdatedTime.DateTime;
  23:           uri= item.Links[0].ToString();
  24:           if (item.Authors.Count > 0)
  25:             author= item.Authors[0].Name;
  26:       }
  27:  
  28:       public SyndicationEvent()
  29:       { }
  30:  
  31:       public string ID
  32:       {
  33:           get { return id;  }
  34:           set { id = value; }
  35:       }
  36:  
  37:       public string Title
  38:       {
  39:           get { return title; }
  40:           set { title = value; }
  41:       }
  42:  
  43:       public string Summary
  44:       {
  45:           get { return summary; }
  46:           set { summary = value; }
  47:       }
  48:  
  49:       public string Text
  50:       {
  51:           get { return text; }
  52:           set { text = value; }
  53:       }
  54:  
  55:       public string Uri
  56:       {
  57:           get { return uri; }
  58:           set { uri = value; }
  59:       }
  60:  
  61:       public string Author
  62:       {
  63:           get { return author; }
  64:           set { author = value; }
  65:       }
  66:  
  67:       public DateTime CreatedTime
  68:       {
  69:           get { return createdTime; }
  70:           set { createdTime = value; }
  71:       }
  72:  
  73:       public DateTime LastUpdatedTime
  74:       {
  75:           get { return lastUpdatedTime; }
  76:           set { lastUpdatedTime = value; }
  77:       }
  78:  
  79:   }

As you might have noticed, our sample syndication event is initialized directly from a System.ServiceModel.Syndication.SyndicationItem object.

Given the time-based nature of syndication feeds, we decided to implement our sample adapter as an interval adapter by deriving from the TypedIntervalInputAdapter<T> class. Basically, an interval adapter process events valid during a specific time interval. You can find more details in my previous post.

For the sake of this example, our adapter polls the syndication URI when initialized. In order to implement this pattern, we leverage the programming model included in the System.ServiceModel.Syndication namespace as illustrated in the following code.

   1: public class RSS20InputAdapter : TypedIntervalInputAdapter<SyndicationEvent>
   2:         {
   3:             private IntervalEvent<SyndicationEvent> pendingevent;
   4:             private DateTime pendingctitime;
   5:  
   6:             SyndicationFeed dataFeed;
   7:             IEnumerator<SyndicationItem> dataFeedItems;
   8:  
   9:             int index = 0;
  10:  
  11:             public RSS20InputAdapter(SyndicationAdapterConfig configInfo, 
  12:                                      CepEventType cepEventType)
  13:             {
  14:                 Rss20FeedFormatter rssFormatter = new Rss20FeedFormatter
  15:                                                (typeof(SyndicationFeed));
  16:                 XmlReader rssReader = XmlReader.Create(configInfo.URL);
  17:                 rssFormatter.ReadFrom(rssReader);
  18:                 rssReader.Close();
  19:                 dataFeed = rssFormatter.Feed;
  20:                 dataFeedItems= dataFeed.Items.GetEnumerator();
  21:             }
  22:  
  23:            ....
  24: }

The StreamInsight runtime will invoke the Start operation of the adapter in order to start listening for events.

   1: public override void Start()
   2: {
   3:     ProduceEvents();
   4: }
   5:  
   6: private void ProduceEvents()
   7: {
   8:   IntervalEvent<SyndicationEvent> currevent=
   9:                      default(IntervalEvent<SyndicationEvent>);
  10:   DateTime currctitime = default(DateTime);
  11:   EnqueueOperationResult result = EnqueueOperationResult.Full;
  12:  
  13:   while (true)
  14:   {
  15:   if (AdapterState.Stopping == AdapterState)
  16:   {
  17:     this.Stopped();
  18:     return;
  19:   }
  20:  
  21:   currevent = CreateEventFromSource();
  22:   pendingevent = null;
  23:  
  24:   // Enqueue point event with payload.
  25:   if (null != currevent)
  26:     result = Enqueue(ref currevent);
  27:   else
  28:   {
  29:     result = EnqueueOperationResult.Full;
  30:     PrepareToStop(currevent);
  31:   }
  32:  
  33:   // Handle Enqueue rejection
  34:   if (EnqueueOperationResult.Full == result)
  35:   {
  36:     EnqueueCtiEvent(DateTime.MaxValue);
  37:     PrepareToStop(currevent);
  38:     Stopped();
  39:     return;
  40:                      
  41:   }
  42:   // Enqueue Cti event based on application logic
  43:   result = EnqueueCtiEvent(currctitime);
  44:   }
  45: }

Notice that the adapter will queue the events corresponding to the different syndication items until there are no more syndication entries available. After that, the adapter inserts a CTI event indicating that no subsequent INSERT events can have a start time earlier than the timestamp of the CTI event.

The ProduceEvents operation is also invoked as part of the Resume method.

   1: public override void Resume()
   2: {               
   3:   ProduceEvents();
   4: }

The rest of the implementation follows a similar pattern to most adapters and it fundamentally controls the state lifecycle of the adapter. The following code illustrates the complete implementation of our sample RSS adapter.

   1: public class RSS20InputAdapter : TypedIntervalInputAdapter<SyndicationEvent>
   2:         {
   3:             private IntervalEvent<SyndicationEvent> pendingevent;
   4:             private DateTime pendingctitime;
   5:  
   6:             SyndicationFeed dataFeed;
   7:             IEnumerator<SyndicationItem> dataFeedItems;
   8:  
   9:             int index = 0;
  10:             /// <summary>
  11:             /// Constructor - Use this to initialize local resources based on configInfo
  12:             /// structures. Examples are open files or database connections, set delimiters, and
  13:             /// other parameters that enable the adapter to access the input/output device.
  14:             /// </summary>
  15:             public RSS20InputAdapter(SyndicationAdapterConfig configInfo, CepEventType cepEventType)
  16:             {
  17:                 Rss20FeedFormatter rssFormatter = new Rss20FeedFormatter(typeof(SyndicationFeed));
  18:                 XmlReader rssReader = XmlReader.Create(configInfo.URL);
  19:                 rssFormatter.ReadFrom(rssReader);
  20:                 rssReader.Close();
  21:                 dataFeed = rssFormatter.Feed;
  22:                 dataFeedItems= dataFeed.Items.GetEnumerator();
  23:             }
  24:  
  25:             /// <summary>
  26:             /// Start is the first method to be called by the CEP server once the input
  27:             /// adapter has been instantiated. So any initializations that cannot be covered
  28:             /// in the constructor can be placed here. Start() is called in a separate worker
  29:             /// thread initiated by the server. In this example (and in most scenarios), Start
  30:             /// begins producing events once it is called.
  31:             /// </summary>
  32:             public override void Start()
  33:             {
  34:                 ProduceEvents();
  35:             }
  36:  
  37:             /// <summary>
  38:             /// Resume is called by the server once it returns from being scheduled away
  39:             /// from Start, and only after the adapter has called Ready(). Resume continues
  40:             /// to produce events.
  41:             /// </summary>
  42:             public override void Resume()
  43:             {
  44:                 
  45:                 ProduceEvents();
  46:             }
  47:  
  48:             
  49:  
  50:             /// <summary>
  51:             /// Dispose is inherited from the base adapter class and is the placeholder to
  52:             /// release adapter resources when this instance is shut down.
  53:             /// </summary>
  54:             /// <param name="disposing"></param>
  55:             protected override void Dispose(bool disposing)
  56:             {
  57:             }
  58:  
  59:             /// <summary>
  60:             /// Main driver to read events from the source and enqueue them.
  61:             /// </summary>
  62:             private void ProduceEvents()
  63:             {
  64:                 IntervalEvent<SyndicationEvent> currevent = default(IntervalEvent<SyndicationEvent>);
  65:                 DateTime currctitime = default(DateTime);
  66:                 EnqueueOperationResult result = EnqueueOperationResult.Full;
  67:  
  68:                 while (true)
  69:                 {
  70:                     if (AdapterState.Stopping == AdapterState)
  71:                     {
  72:                         this.Stopped();
  73:                         return;
  74:                     }
  75:  
  76:                     currevent = CreateEventFromSource();
  77:                     pendingevent = null;
  78:  
  79:                     // Enqueue point event with payload.
  80:                     if (null != currevent)
  81:                         result = Enqueue(ref currevent);
  82:                     else
  83:                     {
  84:                       result = EnqueueOperationResult.Full;
  85:                         PrepareToStop(currevent);
  86:                        // Stopped();
  87:                     }
  88:  
  89:                     // Handle Enqueue rejection
  90:                     if (EnqueueOperationResult.Full == result)
  91:                     {
  92:                         EnqueueCtiEvent(DateTime.MaxValue);
  93:                         PrepareToStop(currevent);
  94:                         Stopped();
  95:                         return;
  96:                      
  97:                     }
  98:  
  99:                     // Enqueue Cti event based on application logic
 100:                     result = EnqueueCtiEvent(currctitime);
 101:  
 102:                 }
 103:             }
 104:  
 105:             private void PrepareToStop(IntervalEvent<SyndicationEvent> currEvent)
 106:             {
 107:                 // The server will not accept any more events, and you
 108:                 // cannot do anything about it. Release the event.
 109:                 // If you miss this step, server memory will leak.
 110:                 if (null != currEvent)
 111:                 {
 112:                     ReleaseEvent(ref currEvent);
 113:                 }
 114:             }
 115:  
 116:             private void PrepareToResume(IntervalEvent<SyndicationEvent> currevent)
 117:             {
 118:                 pendingevent = currevent;
 119:             }
 120:  
 121:             private void PrepareToResume(DateTime currctitime)
 122:             {
 123:                 pendingctitime = currctitime;
 124:             }
 125:             private bool EndofSource()
 126:             {
 127:                 return false;
 128:             }
 129:             private IntervalEvent<SyndicationEvent> CreateEventFromSource()
 130:             {
 131:                 if (dataFeedItems.MoveNext())
 132:                 {
 133:                     IntervalEvent<SyndicationEvent> syndicationEvent = this.CreateInsertEvent();
 134:                     syndicationEvent.Payload = new SyndicationEvent(dataFeedItems.Current);
 135:                     syndicationEvent.StartTime = dataFeedItems.Current.PublishDate.DateTime;
 136:                     syndicationEvent.EndTime = dataFeedItems.Current.PublishDate.DateTime;
 137:                     return syndicationEvent;
 138:                 }
 139:                 else
 140:                     return null;
 141:             }
 142:         }
 143:  
 144:         // Configuration structure to initialize the adapter.
 145:         public struct SyndicationAdapterConfig
 146:         {
 147:             private string url;
 148:  
 149:             public string URL
 150:             {
 151:                 get { return url; }
 152:                 set { url = value; }
 153:             }
 154:         }
 155:  
 156:         // Factory class is the entry point for the query binder to initialize
 157:         // and create an adapter instance.
 158:         public class SyndicationAdapterFactory : IInputAdapterFactory<SyndicationAdapterConfig>
 159:         {
 160:             public SyndicationAdapterFactory()
 161:             {
 162:             }
 163:  
 164:             public InputAdapterBase Create(SyndicationAdapterConfig configInfo,
 165:                 EventShape eventshape, CepEventType cepeventtype)
 166:             {
 167:                 InputAdapterBase adapter = default(InputAdapterBase);
 168:  
 169:                 if (EventShape.Interval == eventshape)
 170:                     adapter = new RSS20InputAdapter(configInfo, cepeventtype);
 171:  
 172:                 return adapter;
 173:             }
 174:             public void Dispose()
 175:             {
 176:             }
 177:         }

Although the adapter is fully functional we still need to enable the factory mechanism that StreamInsight applications will use to create instances of the adapter. We can accomplish this by implementing a custom adapter factory that inherits from the Microsoft.ComplexEventProcessing.Adapters.IInputAdapterFactory<T> class.

   1: public class SyndicationAdapterFactory : 
   2:                          IInputAdapterFactory<SyndicationAdapterConfig>
   3:        {
   4:            public SyndicationAdapterFactory()
   5:            {
   6:            }
   7:  
   8:            public InputAdapterBase Create(SyndicationAdapterConfig configInfo,
   9:                EventShape eventshape, CepEventType cepeventtype)
  10:            {
  11:                InputAdapterBase adapter = default(InputAdapterBase);
  12:  
  13:                if (EventShape.Interval == eventshape)
  14:                    adapter = new RSS20InputAdapter(configInfo, cepeventtype);
  15:  
  16:                return adapter;
  17:            }
  18:            public void Dispose()
  19:            {
  20:            }
  21:        }

At this point, our sample adapter can be used from any StreamInsight application. For instance, the following code illustrates a sample application that uses our adapter to query the Microsoft's Bing News feed filtering for items containing a specific keyword. The events that match the filter criteria are output it to the output console using the TextFileOutputAdapter sample included in the StreamInsight SDK.

   1: private static void RssAdapterTest(string keyword)
   2: {
   3:  Server cepServer = Server.Create();
   4:  Application cepApp = cepServer.CreateApplication("SampleCEPApplication");
   5:  InputAdapter syndicationInputAdapter = 
   6:         cepApp.CreateInputAdapter<SyndicationAdapterFactory>("sampleadapter", "");
   7:  OutputAdapter fileOutputAdapter=
   8:         cepApp.CreateOutputAdapter<TextFileOutputFactory>("sampleadapter", "");
   9:  CepEventType sensorEventType = cepApp.CreateEventType<SyndicationEvent>();
  10:  
  11:  CepStream<SyndicationEvent> dataFeedStream=
  12:                CepStream.CreateInputStream<SyndicationEvent>("datafeedstream");
  13:  
  14: var query = from s1 in dataFeedStream 
  15:             where s1.Title.Contains(keyword)
  16:             select new { Title = s1.Title, Time = s1.LastUpdatedTime,
  17:                          Summary= s1.Summary};
  18:  
  19:  QueryTemplate template = cepApp.CreateQueryTemplate("samplequery", query);
  20:  QueryBinder binder = new QueryBinder(template);
  21:  var dataFeedInputConf = new SyndicationAdapterConfig
  22:  {
  23:    URL = "http://www.bing.com/news?FORM=Z9LH6&format=rss"
  24:  };
  25:  
  26:  
  27:  var outputConfig = new TextFileOutputConfig
  28:  {
  29:    OutputFileName = String.Empty,
  30:    Delimiter = '\t',
  31:    AdapterStopSignal = "StopAdapter"
  32:  };
  33:  
  34:  binder.BindInputStream<SensorReading, SyndicationAdapterConfig>("datafeedstream", 
  35:                  syndicationInputAdapter, EventShape.Interval, dataFeedInputConf);
  36:  binder.AddQueryConsumer<TextFileOutputConfig>("queryresult", 
  37:                              fileOutputAdapter, outputConfig, EventShape.Interval, 
  38:                              StreamEventOrder.FullyOrdered);
  39:  
  40:  Query cepQuery = cepApp.CreateQuery("samplequery", binder, "");
  41:  EventWaitHandle adapterStopSignal = new EventWaitHandle(false, 
  42:                                        EventResetMode.ManualReset, "StopAdapter");
  43:  cepQuery.Start();
  44:  
  45:  adapterStopSignal.WaitOne();
  46:  Console.ReadLine();
  47:  cepQuery.Stop();
  48:  
  49: }

1 Comment

Comments have been disabled for this content.