Processing events from multiple sources using Microsoft StreamInsight
One of the fundamental patterns of Complex Event Processing (CEP) applications is the ability of process events from various input sources and distribute to multiple output sources. These operations require high degrees of coordination what makes it particularly difficult to implement in real world scenarios. Why is that? Well, for starters, continuously querying data from multiple sources entails implementing certain degrees of parallelisms on the CEP application. As we all know, parallel processing techniques typically introduces challenges from the error handling and availability perspective. These complexity is increased on CEP scenarios that need to create queries that combines events from multiple sources that are being produced in parallel. The following figure helps to illustrate that scenario.
Figure: CEP concurrency scenario
Microsoft StreamInsight provides an elegant model for expressing these type of concurrency scenarios via LINQ queries. Essentially, StreamInsight abstracts the complexity of concurrently aggregating events from multiple sources by just using LINQ join and union primitives. All the mechanisms of coordinating the data from the different event sources are internally handled by the StreamInsight runtime and adapters.
Let's look at a example that illustrates these concepts. Suppose that we have multiple temperature sensors deployed on different areas of a secure facility. The sensors are constantly emitting events about the temperature of their specific area. The combination of those values can trigger certain alert conditions that should be distributed to various systems.
The first step of implementing this scenario using StreamInsight is to model the event types that are produced by the sensors. We can accomplish this by annotating a class with the EventType and EventTypeField attributes as illustrated in the following code.
1: [EventType]
2: public struct SensorReading
3: {
4: [EventTypeField(0)]
5: public string SensorId { get; set; }
6: [EventTypeField(1)]
7: public int Temperature{ get; set; }
8:
9: }
After creating the event types, we can instantiate various event streams based on that type. We will use to event streams for the simplicity of this sample.
1: CepStream<SensorReading> sensor1Stream =
CepStream.CreateInputStream<SensorReading>("sensor1stream");
2: CepStream<SensorReading> sensor2Stream =
CepStream.CreateInputStream<SensorReading>("sensor2stream");
We can aggregate data from our event input streams by using a LINQ join operations as illustrated in the following code. The following query creates an Alerts based on the values produced by the two sensor’s streams.
1: var query = from s1 in sensor1Stream
2: join s2 in sensor2Stream
3: on true equals true
4: where s1.Temperature > 100 && s2.Temperature > 50
5: select new { Condition= "ALERT", sensor1= s1.Temperature,
sensor2= s2.Temperature };
For the sake of this example we are going to use the csv file adapter included in the StreamInsight SDK samples. The following code shows how to create both the input and output adapters.
1: Server cepServer = Server.Create();
2: Application cepApp = cepServer.CreateApplication("SampleCEPApplication");
3: InputAdapter fileInputAdapter = cepApp.CreateInputAdapter<TextFileInputFactory>
4: ("sampleadapter", "");
5: OutputAdapter fileOutputAdapter= cepApp.CreateOutputAdapter<TextFileOutputFactory>
6: ("sampleadapter", "");
At this point we have created all the design time StreamInsight artifacts needed by our application. Our next step is to bind our query and adapters to the physical locations containing the event data. We can accomplish that by using a query template associated with our sample query. We will add the template to a QueryBinder object that can also be used to associate the adapter type with specific location configurations.
1: QueryTemplate template= cepApp.CreateQueryTemplate("samplequery", query);
2: QueryBinder binder = new QueryBinder(template);
3: var sensor1InputConf = new TextFileInputConfig
4: {
5: InputFileName = "input1.csv",
6: Delimiter = '\t',
7: CTIfrequency = 9
8: };
9: var sensor2InputConf = new TextFileInputConfig
10: {
11: InputFileName = "input2.csv",
12: Delimiter = '\t',
13: CTIfrequency = 9
14: };
15:
16:
17: var outputConfig = new TextFileOutputConfig
18: {
19: OutputFileName = String.Empty,
20: Delimiter = '\t',
21: AdapterStopSignal = "StopAdapter"
22: };
23:
24: binder.BindInputStream<SensorReading, TextFileInputConfig>
25: ("sensor1stream", fileInputAdapter, EventShape.Intrval, sensor1InputConf);
26: binder.BindInputStream<SensorReading, TextFileInputConfig>
27: ("sensor2stream", fileInputAdapter, EventShape.Interval, sensor2InputConf);
28: binder.AddQueryConsumer<TextFileOutputConfig>
29: ("queryresult", fileOutputAdapter, outputConfig, EventShape.Point,
30: StreamEventOrder.FullyOrdered);
In order to run our application we only need to create an instance of the Query object and invoke the start operation.
1: Query cepQuery = cepApp.CreateQuery("samplequery", binder, "");
2: EventWaitHandle adapterStopSignal = new EventWaitHandle(false,
3: EventResetMode.ManualReset, "StopAdapter");
4: cepQuery.Start();
5: adapterStopSignal.WaitOne();
If we run this example with the following inputs.
Input1.csv:
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor1 70
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor1 100
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor2 45
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor1 56
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor2 102
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor1 103
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor1 150
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor2 66
Input2.csv:
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor1 70
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor1 105
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor2 50
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor1 58
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor2 111
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor1 101
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor1 150
6/25/2009 12:00:00 AM 6/25/2009 12:00:20 AM sensor2 66
The following output is produced.
INSERT 6/25/2009 12:00:00 AM ALERT 102 58
INSERT 6/25/2009 12:00:00 AM ALERT 102 66
INSERT 6/25/2009 12:00:00 AM ALERT 102 70
INSERT 6/25/2009 12:00:00 AM ALERT 102 101
INSERT 6/25/2009 12:00:00 AM ALERT 102 105
INSERT 6/25/2009 12:00:00 AM ALERT 102 111
INSERT 6/25/2009 12:00:00 AM ALERT 102 150
INSERT 6/25/2009 12:00:00 AM ALERT 103 58
INSERT 6/25/2009 12:00:00 AM ALERT 103 66
INSERT 6/25/2009 12:00:00 AM ALERT 103 70
INSERT 6/25/2009 12:00:00 AM ALERT 103 101
INSERT 6/25/2009 12:00:00 AM ALERT 103 105
INSERT 6/25/2009 12:00:00 AM ALERT 103 111
INSERT 6/25/2009 12:00:00 AM ALERT 103 150
INSERT 6/25/2009 12:00:00 AM ALERT 150 58
INSERT 6/25/2009 12:00:00 AM ALERT 150 66
INSERT 6/25/2009 12:00:00 AM ALERT 150 70
INSERT 6/25/2009 12:00:00 AM ALERT 150 101
INSERT 6/25/2009 12:00:00 AM ALERT 150 105
INSERT 6/25/2009 12:00:00 AM ALERT 150 111
INSERT 6/25/2009 12:00:00 AM ALERT 150 150
CTI 12/31/9999 11:59:59 PM