Using Complex Event Processing (CEP) with Microsoft StreamInsight to Analyze Twitter Tweets 6: The Sample Application 2

Note: This post is one of a series, the overview can be found here: Complex Event Processing with StreamInsight

The sample code to the application can be downloaded here:

Putting it all together

Now that we have written our TweetItem, TwitterStream an Unsubscriber class, we can put it all together and write the actual StreamInsight code. We start by implementing the main method.

First we define the StreamInsight Server. In the Create() method we specify the name of the StreamInsight instance as we defined it during the installation. Then we create a StreamInsight application that will hold our data sources, sinks and queries.


Next, we define our data sink. We write a method that prints the event to the console. There is an IF statement that only prints the stream events that we are interested in and omits the CTI’s. We implement it as a generic method with a type variable for the payload. This allow us to reuse it when we have different queries with different output POCOs:


Following the StreamInsight 2.1 approach we can create a sink around the method above and hook it to the application in one line. Note: You have to include the Microsoft.ComplexEventProcessing.Linq namespace in order to see the DefineObserver() extension method.


However, it is one line but to my opinion it is hard to understand. Let’s pick it apart and analyze what is going on:


We use the static Create() method from the observer class to create an Observer instance that can observer PointEvent<object> and we point it to the ConsoleWritePointNoCTI method that accepts an element of type PointEvent<object> as a parameter.

Next, we create our data source from the TwitterStream.cs class.


Here as well, we can write it in one line but it is even more complicated than the last statement. Lets look at what is going on:


We can see in the code that we begin by creating an instance of our TwitterStream class. Remember that this is the class where we implemented the IObservable<TweetItem> interface.

Then we use the DefineObservable() extension method from the StreamInsight assembly to convert the IObservable into an IQbservable. From there, we convert the IQbservable into an IQStreamable. In this call we define the structure of our PointEvent items and instruct StreamInsight to add CTIs to the stream.

Finally, we create a binding that connects the data source and the data sink and we call the Run() method on that binding. In this first example, we do not yet use a query and just run all the data that we receive in the source directly to the sink.


And that’s it! Hit F5 and HERE WE GO!

The output should look something like this:


Note: The question marks come from languages with characters that can not be printed by the standard console.

Bravo! You can now receive live Twitter data processed by StreamInsight!

Next week, we will write our first queries and process the data between the source and the sink.