3 Comments

Using Complex Event Processing (CEP) with Microsoft StreamInsight to Analyze Twitter Tweets 6: The Sample Application 2

Note: This post is one of a series, the overview can be found here: Complex Event Processing with StreamInsight

The sample code to the application can be downloaded here:

Putting it all together

Now that we have written our TweetItem, TwitterStream an Unsubscriber class, we can put it all together and write the actual StreamInsight code. We start by implementing the main method.

First we define the StreamInsight Server. In the Create() method we specify the name of the StreamInsight instance as we defined it during the installation. Then we create a StreamInsight application that will hold our data sources, sinks and queries.

image

Next, we define our data sink. We write a method that prints the event to the console. There is an IF statement that only prints the stream events that we are interested in and omits the CTI’s. We implement it as a generic method with a type variable for the payload. This allow us to reuse it when we have different queries with different output POCOs:

image

Following the StreamInsight 2.1 approach we can create a sink around the method above and hook it to the application in one line. Note: You have to include the Microsoft.ComplexEventProcessing.Linq namespace in order to see the DefineObserver() extension method.

image

However, it is one line but to my opinion it is hard to understand. Let’s pick it apart and analyze what is going on:

image

We use the static Create() method from the observer class to create an Observer instance that can observer PointEvent<object> and we point it to the ConsoleWritePointNoCTI method that accepts an element of type PointEvent<object> as a parameter.

Next, we create our data source from the TwitterStream.cs class.

image

Here as well, we can write it in one line but it is even more complicated than the last statement. Lets look at what is going on:

image

We can see in the code that we begin by creating an instance of our TwitterStream class. Remember that this is the class where we implemented the IObservable<TweetItem> interface.

Then we use the DefineObservable() extension method from the StreamInsight assembly to convert the IObservable into an IQbservable. From there, we convert the IQbservable into an IQStreamable. In this call we define the structure of our PointEvent items and instruct StreamInsight to add CTIs to the stream.

Finally, we create a binding that connects the data source and the data sink and we call the Run() method on that binding. In this first example, we do not yet use a query and just run all the data that we receive in the source directly to the sink.

image

And that’s it! Hit F5 and HERE WE GO!

The output should look something like this:

image

Note: The question marks come from languages with characters that can not be printed by the standard console.

Bravo! You can now receive live Twitter data processed by StreamInsight!

Next week, we will write our first queries and process the data between the source and the sink.

  • Vu Nguyen

    Hi Manuel,

    I got your example run on my machine. And I noticed something that is interesting.
    Notice the bold text below:

    var twitterStreamable = app.DefineObservable(() =>
    new TwitterStream()).ToPointStreamable(
    tweet => PointEvent.
    CreateInsert(tweet.CreationDate, tweet),
    AdvanceTimeSettings.IncreasingStartTime);

    When I used an instance instead of inline instance like this
    var twitterStream = new TwitterStream();

    var twitterStreamable = app.DefineObservable(() =>
    twitterStream).ToPointStreamable(
    tweet => PointEvent.
    CreateInsert(tweet.CreationDate, tweet),
    AdvanceTimeSettings.IncreasingStartTime);

    I got error: Additional information: Cannot serialize value of type ‘TwitterStream’.
    After I marked TwitterStream with DataContractAttribute and Observer with DataMember.

    There was no more serialization error, but no result is returned, and with debugging, I noticed that no Observer intance of StreamInsight in TwitterStream.Observer list.

    I don’t know the reason here…I know that StreamInsight serialize everything, but I already marked the TwitterStream with [DataContract]

    Do you have any idea about this?
    Please let me know your feedback.

    Best Regards,
    Vu

  • Manuel

    Hi Vu,
    That seems very strange. Sorry but nothing comes to my mind at the moment. 🙁
    Rgds MM

  • Vu Nguyen

    Hi Manuel,

    This can be problem if in some code, the instance of twitter stream is used to do something else, or be created from factory, or can be get from some injection.

    I already tried with a stream factory that create a twitter stream, and error is that it need to serialize my factory….If in this case, my factory cannot be serialized, it wont work, and there is no reason that factory need to be serilized I think.

    Similar issue to some injection.

    I know that you are very busy, but if you have any spare time. I wonder if you and get your sample back and have a look at it.

    I think this is interesting.

    Regards,
    Vu