Stairway to Streaminsight

Pulling real-time data from Twitter using StreamInsight

,

Introduction

This article describes an example scenario in Microsoft SQL Server 2008 R2 StreamInsight of reading real-time data from Twitter. It assumes that you have basic knowledge of StreamInsight. If you need an introduction you should look at previous articles here.

Background

StreamInsight is a platform for Complex Event Processing (CEP) that enables the processing of high speed data streams from multiple data sources. It is one of the new components of SQL Server 2008 R2. Example application areas for CEP include financial applications, fraud detection, manufacturing, surveillance of internet traffic and real-time business intelligence.

Twitter is a good data source for StreamInsight because it provides a streaming API. It is also a data source that is valuable to monitor for various kinds of applications.

The Twitter Streaming API

Twitter provides an API for near-realtime access to the Twitter public contents. The API is documented on this link. The API contains methods to retrieve a stream of statuses (or "tweets" as they also are called). Some examples of the API methods are:

statuses/filter

Returns public statuses (tweets) that match one or more filter predicates.

Possible parameters: count, delimited, follow, locations, track

statuses/firehose

Returns all public statuses. Restricted to only users with high access levels.

Possible parameters: count, delimited

statuses/sample

Returns a random sample of all public statuses. The default user access level gives about 1% of all public statuses.

Possible parameters: count, delimited

Authentication and access levels

Currently any Twitter user can use the Twitter streaming API. You just supply your ordinary username/password. However there are different access levels for users that limits the rate at which you can receive statuses.

Twitter is also extending the API with "user streams" for desktop applications and "site streams" for web sites or mobile push services. These will probably require that you've been approved by Twitter to use them.

Notice that the authenticaion and access levels may change in the future!

Modelling Twitter data in StreamInsight

Events in StreamInsight can be of three temporal types: Point, Interval and Edge. Since Twitter messages are something that happens at a point in time, with no specific duration, the most suitable is to use Point events. If necessary, the point events could easily be transformed into interval or edge events.

The event data, which is called the "payload" in StreamInsight, is modelled using a C# class (or similar in other .NET languages). The requirements of the payload class is that it consists of fields or properties of only simple types - no nested types or collections except for byte arrays. Therefore we cannot make a "generic" format that would support any kind of Twitter information. Instead we must have fixed fields that contain the information we are interested in.

Let's have a look at a sample input we get from the Twitter streaming API (in JSON format):

{"text":"@SJSNSDfacts you're my endless love","in_reply_to_status_id":22794441054,"retweet_count":null,"contributors":null,"created_at":"Thu Sep 02 12:41:59 +0000 2010","geo":null,"source":"web","coordinates":null,"in_reply_to_screen_name":"SJSNSDfacts","truncated":false,"entities":{"user_mentions":[{"indices":[0,12],"screen_name":"SJSNSDfacts","name":"SJSNSDSG","id":175075653}],"urls":[],"hashtags":[]},"retweeted":false,"place":null,"user":{"friends_count":52,"profile_sidebar_fill_color":"121203","location":"Secret Garden","verified":false,"follow_request_sent":null,"favourites_count":20,"profile_sid

You can also get another kind of event from the Twitter streaming API, the delete event. A delete event simply means that a previous message should be removed. Below is a sample:

{"delete":{"status":{"id":22794449900,"user_id":185635187}}}

We now pick out the fields we are interested in and make a payload class:

public class Tweet
{
    public Int64 ID { get; set; }
    public DateTime CreatedAt { get; set; }
    public string Text { get; set; }
    public string Source { get; set; }
    public string UserLang { get; set; }
    public int UserFriendsCount { get; set; }
    public string UserDescription { get; set; }
    public string UserTimeZone { get; set; }
    public string UserURL { get; set; }
    public string UserName { get; set; }
    public Int64 UserID { get; set; }
    public Int64 DeleteStatusID { get; set; }
    public Int64 DeleteUserID { get; set; }
}

If we want to include collections, like the user_mentions, we'd have to convert them into delimited strings or alternatively convert them to a byte array. I'll leave that outside of this article.

Another issue is when having two kinds of events like in the Twitter streaming API whether to mix them into the same payload or separate them into different streams. In StreamInsight an input adapter can only produce one kind of event (one kind of payload) per adapter instance. That also applies to an untyped input adapter, where the type is determined when the adapter is instanciated. I've chosen to merge the two events into the same payload, but it really could be implemented as two different payloads instead.

Now that we have our payloads it's time to build the input adapter.

Building the Input Adapter

The first decision is which Twitter streaming API methods the input adapter should support. I decided to support two of them, the "sample method" and the "filter method" (described earlier under The Twitter Streaming API), because they are available to any user. The input adapter should then

  • establish a streaming connection with the Twitter server using any of these two methods
  • supply any parameters and credentials
  • transform anything it receives from the Twitter server into StreamInsight events

So what does the input adapter need? Let's summarize the above and also add another field for timeout (the maximum time we want to wait for a new event before we believe that something went wrong).

public class TwitterConfig
{
    public TwitterMode Mode { get; set; }
    public string Username { get; set; }
    public string Password { get; set; }
    public string Parameters { get; set; }
    public int Timeout { get; set; }
    public TwitterConfig()
    {
        Timeout = 300000; // Wait maximum 5 minutes by default...
        Mode = TwitterMode.Sample;
        Parameters = "";
    }
}

And TwitterMode is either one of our two methods:

public enum TwitterMode
{
    Sample,
    Filter
}

The input adapter then should establish a HTTP (or HTTPS) connection with the Twitter server, passing these parameters in the HTTP Request. That can be done using the standard .NET HttpWebRequest class. The Twitter streaming API will separate each event with a linefeed character, so you can call ReadLine on the response stream to get one event at a time. The basic structure of the code is:

var request = HttpWebRequest.Create(url);
request.Timeout = _config.Timeout;
request.Credentials = new NetworkCredential(_config.Username, _config.Password);
var response = request.GetResponse();
while ()
{
    var line = streamReader.ReadLine();
    ...

We also need to deal with the JSON format. Actually the Twitter streaming API also supports XML, but JSON is preferred because it saves bandwidth. Instead of implementing my own JSON parser I've chosen to use a freeware component that is available on CodePlex, Newtonsoft Json.NET. It is easy to use. Assuming that we have the JSON data in a variable called "line", we can pull out the interesting data the following way:

var jObject = JObject.Parse(line);
var tweet = new Tweet();
tweet.ID = jObject.SelectToken("id").Value<Int64>();
tweet.Text = jObject.SelectToken("text").Value<string>();
...

Finally the input adapter needs to submit the tweets as events in StreamInsight. Also you need to add CTI events that indicate that no more events exist that have a timestamp earlier than the tweets.

// Produce INSERT event
currEvent = CreateInsertEvent();
currEvent.StartTime = DateTimeOffset.Now;
currEvent.Payload = tweet;
Enqueue(ref currEvent);
// Also send a CTI event
EnqueueCtiEvent(DateTimeOffset.Now);

The complete source of the adapter can be downloaded from here.

Adding a User Interface

To allow testing of the input adapter, we need an output adapter and user interface that let's the user pick a few options and display the incoming stream on the screen.

To display data in a window, the output adapter needs to communicate with the window class. There are many options for such a communications. I've chosen to use WCF (Windows Communications Foundation) for that task. Basically you make the window into a WCF host and the output adapter into a WCF client.

In the WCF host you have to create an instance of System.ServiceModel.ServiceHost and make it listen on a communications channel (in this case a named pipe).

serviceHost = new ServiceHost(this, new Uri("net.pipe://localhost"));
serviceHost.AddServiceEndpoint(typeof(IStackerCtl), new NetNamedPipeBinding(), PipeName);
serviceHost.Open();

You also provide an interface for the WCF Host.

[ServiceContract]
public interface IStackerCtl
{
    [OperationContract]
    void Push(string message);
}

In the output adapter you need to connect to the WCF host, which can be done in the constructor.

ChannelFactory<IStackerCtl> stackerCtlFactory =
 new ChannelFactory<IStackerCtl>(new NetNamedPipeBinding(),
 new EndpointAddress("net.pipe://" + config.StackerCtlHostName + "/" + config.StackerCtlPipeName));
stackerCtl = stackerCtlFactory.CreateChannel();

Then in the main loop of the adapter you can just call stackerCtl.Push(). It is also good to wrap it with some kind of error handling. In this case we will just stop the adapter if communications fails.

 try
 {
     stackerCtl.Push(msg);
 }
 catch
 {
     stopFlag = true;
 }

Conclusion

This article describes how to pull Twitter data into StreamInsight using the Twitter streaming API. Having a streaming API much simplifies the process of writing an input adapter. Also this streaming API is currently accessible to any Twitter user and therefore available to anyone.

Note that the Twitter adapter in this article is intended for educational purposes only. For simplicity it does not contain the error handling and other features that should be present in a production environment.

Source Code

The complete source code for this example is available in this CodePlex project.

About the Author

Johan Åhlén is a MCITP and one of the pioneers on StreamInsight. He is one of the main contributors of the public samples on the CodePlex StreamInsight project. Johan has 12+ years experience of SQL Server and is the leader of the Swedish SQL Server User Group. In his daily work, Johan works as an independent consultant with StreamInsight and Business Intelligence solutions for financial companies.

The Series

This article is part of a series that is designed to help you quickly understand and begin using StreamInsight. You can view the entire series here.

 

This article is part of the parent stairway Stairway to StreamInsight

Rate

You rated this post out of 5. Change rating

Share

Share

Rate

You rated this post out of 5. Change rating