Stairway to Streaminsight

Getting started with StreamInsight

,

Introduction

This article is the second in a series of articles describing Microsoft SQL Server 2008 R2 StreamInsight. While the first article described the background and application areas, this article describes how to can get started with StreamInsight, with an example of running queries against financial data.

You can see the entire Stairway to StreamInsight series summary here.

Background

StreamInsight is a platform for Complex Event Processing (CEP) that enables the processing of high speed data streams from multiple data sources. It is one of the new components of SQL Server 2008 R2. Example application areas for CEP include financial applications, fraud detection, manufacturing, surveillance of internet traffic and real-time business intelligence.

Complex Event Processing is quite different than relational database queries. Instead of one-time queries against finite stored data sets you have continuous standing queries against continuous data streams. Also you may have a requirement of a near-zero latency.

Challenges that are faced within the field of stream processing and complex event processing include:

  • Performance and Scalability - including techniques such as clustering and distributed processing
  • Out-of-order data arrival
  • Data quality problems - duplicate data, retroactive corrections
  • Time discontinuities such as daylight savings and leap seconds
  • Combining real-time and historical data

A CEP Engine takes care of at least some of these problems, making building robust stream processing applications much easier.

StreamInsight architecture

Events

Events represent data that change over time. It can for example be stock prices, temperatures, Twitter messages or credit card transactions. Generally StreamInsight is used to process low-level Primitive Events into higher-level Complex Events, but it can also be used for simple operations such as filtering.

Events in StreamInsight are of two kinds:

  • INSERT Events, that are used to submit data
  • CTI (Current Time Increment) Events, a kind of system Event that provides the current time of the source and indicates the completeness of existing Events (that there will be no more Events with timestamps older than the current time).

An INSERT Event in StreamInsight can be of three different temporal types:

  • Point Events, that has no duration
  • Interval Events, that has a duration
  • Edge Events, which usually represent cases where the duration is still unknown at Event start time

The INSERT Events consist of a header, which is dependent on the temporal type, and a body which is called a Payload. The Payload is a user-defined class which contains the actual data. Only a limited number of simple data types are supported for the members of the Payload.

Streams

All data in StreamInsight is organized into streams. The streams represent a continuous and possibly endless flow of events. A single stream can only contain one kind of event - so if you have different kinds of events you need multiple streams.

Input Adapters

Input Adapters are responsible for receiving Events from external sources and putting them into the Stream in a format that is consumable by the queries. An Input Adapter that always emits the same type of Event is called a typed Input Adapter. There are also untyped Input Adapters that can produce different kinds of Events. However, at run time each instance of an untyped Input Adapter can only produce one kind of Event.

Output Adapters

Output Adapters are responsible for outputting the Events to external destinations (sinks). They are in many ways similar to Input Adapters, except for the direction of data flow.

In the November CTP of StreamInsight, only one output adapter can be connected to a single query, but there are indications that they will allow multiple output adapters per query in the future.

Query Templates

Query Templates define the business logics of your CEP Application. They are defined declaratively using LINQ expressions. A number of computational operations are supported:

  • Filtering (WHERE-clause)
  • Projecting (SELECT-clause)
  • Joins (Cross Joins and Full Inner Joins)
  • Unions
  • Aggregations over a time window (SUM, MAX, MIN, COUNT, ...)
  • Group and Apply (computations in separate groups)
  • Top K within a time window (with ORDERBY)
  • Temporal alterations
  • User-defined scalar functions (Filters and Projects)
  • User-defined aggregates (Scalar)
  • User-defined operators (Set based)

Query Templates are only used in the explicit development model.

Query Instances

Binding a query template with specific input and output adapters together with configuration information results in a query instance in the CEP server. A Query Instance can be started, stopped and monitored through Diagnostic Views.

In the implicit and IObservable/IObserver development models, Query Instances are created when you bind you input, output and LINQ query together.

Diagnostic Views

Diagnostic Views provide information about objects on the CEP server or statistics on the CEP server itself. They give information about the state of Query Instances, latency times, number of Events and memory consumption.

Diagnostic Views are acquired by calling the GetDiagnosticView method. To get updated information you need to call that function repeatedly.

Example - some queries against financial data

The following section will describe an example of running queries against stock quotes. I've chosen the IObservable/IObserver development model, because it is the simplest to start with. Also I've created some example files that contain real historical data from 2009. You could easily create your own historical data files or replace the input by another adapter that provides live data (for example my YahooFinance adapter article).

The data provided in this example is:

  • Ericsson Stock Quotes from New York 2009 in US Dollars (USD)
  • Ericsson Stock Quotes from Stockholm 2009 in Swedish Krona (SEK)
  • Nokia Stock Quotes from New York 2009 in US Dollars (USD)
  • NASDAQ Index from 2009 in US Dollars (USD)
  • Exchange Rate between US Dollars and Swedish Krona for 2009

Note that the stock quotes are only daily values. In a real-world situation you would probably work with a much finer time-scale such as seconds or milliseconds.

The example also contains exchange rates for US Dollar/Swedish Krona. The purpose is to show an example where the same stock is available on multiple markets, in multiple currencies, where there may be a possibility of an arbitrage (the practice of making a risk-free profit through a combination of matching deals to take advantage of the pricing difference). However, for a true comparison you would need to take the stock quotes and exchange rate at exactly the same time (or nearly), which they are not in my example.

Prerequisites

To run this example you will need the following:

  • Microsoft SQL Server 2008 R2 StreamInsight (Version 1 or later)
  • Visual Studio 2008 (or preferably Visual Studio 2010)

StreamInsight can be downloaded from the link at the end of this article. There you can also find a link to the complete source code of this example.

Step 1 - Creating your Visual Studio project

Now let's get started writing a StreamInsight application. You can create any kind of .NET application, but for simplicity let's choose a Console Application (Visual C#):

Note: In the current version of StreamInsight (April 2010) .NET 4.0 is not supported so you must choose .NET 3.5.

The next step is to add references to the StreamInsight assemblies. You should add references to:

  • Microsoft.ComplexEventProcessing
  • Microsoft.ComplexEventProcessing.Adapters
  • Microsoft.ComplexEventProcessing.Diagnostics

By default they can be found in the folder "C:\Program Files\Microsoft StreamInsight 1.0\Bin". You should then see them listed under References as below:

In your program code you also need to add a few using-statements:

Now we have all the prerequisites in place for starting to write our code.

Step 2 - Defining the Event Payload

The input is provided in historical stock price files as shown by the example below:

First we must define a class holding our Event data (the "Payload"). It is rather straightforward to define, but remember that you can only use primitive types such as numbers, strings and datetimes in the class definition:

public class StockQuote

{

public string StockID { get; set; }

public string FieldID { get; set; }

public double Value { get; set; }

}

The class has a TimeStamp, a StockID (which tells the name of the stock it refers to), a FieldID (which tells the name of the column) and finally a numerical Value. StockID cannot be read from the file, so it has to be passed to be passed to the input adapter somehow.

Step 3 - Reading the input

To read our data we'll need an appropriate input adapter. Adapters are made up of a minimum of three classes:

  • Configuration class
  • Factory class
  • Adapter class

The configuration class contains specific information for your input adapter instance. It is mandatory, but it can be an empty class with no members if you wish no configuration possibilities. We'll put in a couple of properties to set the filename to be read, which column names to include, etc.

public class StockQuoteInputConfig

{

public string ID { get; set; }

public string Filename { get; set; }

public string[] ColumnNames { get; set; }

public DateTime StartDate { get; set; }

public int Interval { get; set; }

}

The factory class is responsible for creating instances of the adapter class. If you have an adapter that works on point, interval and edge events, then it is the responsibility of the factory class to instantiate the correct adapter class. We'll just support point events in this example.

public class StockQuoteInputFactory : ITypedInputAdapterFactory<StockQuoteInputConfig>

{

public InputAdapterBase Create<TPayload>(StockQuoteInputConfig config, EventShape eventShape)

{

// Only support the point event model

if (eventShape == EventShape.Point)

return new StockQuoteTypedPointInput(config);

else

return default(InputAdapterBase);

}

public void Dispose()

{

}

}

Finally, the adapter class that will do the actual work - reading files and creating events - needs to be developed. For pull-based adapters (such as this), they mostly consist of a loop that checks for adapter state changes and new data.

public class StockQuoteTypedPointInput : TypedPointInputAdapter<StockQuote>

{

public readonly static IFormatProvider QuoteFormatProvider = CultureInfo.InvariantCulture.NumberFormat;

private PointEvent<StockQuote> pendingEvent;

private StockQuoteInputConfig _config;

private SortedList<DateTime, string[]> quotes;

private IEnumerator<KeyValuePair<DateTime, string[]>> quoteEnumerator;

private SortedList<string, int> columns;

/// <summary>

/// Constructor

/// </summary>

/// <param name="config">Configuration for this adapter</param>

public StockQuoteTypedPointInput(StockQuoteInputConfig config)

{

_config = config;

var streamReader = new StreamReader(config.Filename);

var line = streamReader.ReadLine();

var values = line.Split(',');

columns = new SortedList<string, int>(values.Length);

for (int i = 0; i < values.Length; i++)

columns.Add(values, i);

quotes = new SortedList<DateTime, string[]>();

while (!streamReader.EndOfStream)

{

line = streamReader.ReadLine();

values = line.Split(',');

var date = DateTime.Parse(values[0], QuoteFormatProvider);

quotes.Add(date, values);

}

quoteEnumerator = quotes.GetEnumerator();

streamReader.Close();

}

public override void Start()

{

ProduceEvents();

}

public override void Resume()

{

ProduceEvents();

}

protected override void Dispose(bool disposing)

{

base.Dispose(disposing);

}

/// <summary>

/// Main loop

/// </summary>

private void ProduceEvents()

{

var currEvent = default(PointEvent<StockQuote>);

EnqueueCtiEvent(new DateTimeOffset(_config.StartDate, TimeSpan.Zero));

try

{

// Loop until stop signal

while (AdapterState != AdapterState.Stopping)

{

if (pendingEvent != null)

{

currEvent = pendingEvent;

pendingEvent = null;

}

else

{

if (quoteEnumerator.MoveNext())

{

try

{

var quote = quoteEnumerator.Current;

var date = new DateTimeOffset(quote.Key, TimeSpan.Zero);

foreach (var columnName in _config.ColumnNames)

{

var i = columns[columnName];

var value = double.Parse(quote.Value, QuoteFormatProvider);

// Produce INSERT event

currEvent = CreateInsertEvent();

currEvent.StartTime = date;

currEvent.Payload = new StockQuote

{

StockID = _config.ID,

FieldID = columnName,

Value = value

};

pendingEvent = null;

//PrintEvent(currEvent);

var result = Enqueue(ref currEvent);

if (result == EnqueueOperationResult.Full)

{

PrepareToResume(currEvent);

Ready();

return;

}

}

// Also send an CTI event

EnqueueCtiEvent(date);

}

catch

{

// Error handling should go here

}

Thread.Sleep(_config.Interval);

}

else

{

break;

}

}

}

if (pendingEvent != null)

{

currEvent = pendingEvent;

pendingEvent = null;

}

PrepareToStop(currEvent);

Stopped();

}

catch (AdapterException e)

{

}

}

private void PrepareToStop(PointEvent<StockQuote> currEvent)

{

//EnqueueCtiEvent(DateTime.Now);

if (currEvent != null)

{

// Do this to avoid memory leaks

ReleaseEvent(ref currEvent);

}

}

private void PrepareToResume(PointEvent<StockQuote> currEvent)

{

pendingEvent = currEvent;

}

}

 

Step 4 - Handling the output

For the purposes of this article, we will create an output adapter that simply displays the events on the console. See example below:

Similarly to the input adapter, an output adapter is made up of a minimum of three classes:

  • Configuration class
  • Factory class
  • Adapter class

The configuration in this case is simple. We really needn't any configuration so we can just make an empty class.

public class StockQuoteOutputConfig

{

}

The factory is similar to the input adapter factories. We'll here also only support point events.

public class StockQuoteOutputFactory : ITypedOutputAdapterFactory<StockQuoteOutputConfig>

{

public OutputAdapterBase Create<TPayload>(StockQuoteOutputConfig config, EventShape eventShape)

{

// Only support the point event model

if (eventShape == EventShape.Point)

return new StockQuoteTypedPointOutput(config);

else

return default(OutputAdapterBase);

}

public void Dispose()

{

}

}

Finally, the adapter class will have a main loop that receives events and displays them on the console.

public class StockQuoteTypedPointOutput : TypedPointOutputAdapter<StockQuote>

{

/// <summary>

/// Constructor

/// </summary>

/// <param name="config">Adapter configuration</param>

public StockQuoteTypedPointOutput(StockQuoteOutputConfig config)

{

}

public override void Start()

{

ConsumeEvents();

}

public override void Resume()

{

ConsumeEvents();

}

protected override void Dispose(bool disposing)

{

base.Dispose(disposing);

}

/// <summary>

/// Main loop

/// </summary>

private void ConsumeEvents()

{

PointEvent<StockQuote> currEvent;

DequeueOperationResult result;

try

{

// Run until stop state

while (AdapterState != AdapterState.Stopping)

{

result = Dequeue(out currEvent);

// Take a break if queue is empty

if (result == DequeueOperationResult.Empty)

{

PrepareToResume();

Ready();

return;

}

else

{

// Write to console

if (currEvent.EventKind == EventKind.Insert)

{

Console.WriteLine("Output: " +

currEvent.StartTime + " " +

currEvent.Payload.StockID + " " +

currEvent.Payload.FieldID + " " +

currEvent.Payload.Value.ToString("f2"));

}

ReleaseEvent(ref currEvent);

}

}

result = Dequeue(out currEvent);

PrepareToStop(currEvent, result);

Stopped();

}

catch (AdapterException e)

{

}

}

private void PrepareToResume()

{

}

private void PrepareToStop(PointEvent<StockQuote> currEvent, DequeueOperationResult result)

{

if (result == DequeueOperationResult.Success)

{

ReleaseEvent(ref currEvent);

}

}

}

The interface has three methods that you need to implement:

  • OnNext, which is called when you receive an event
  • OnCompleted, which is called when the query is finished (if it ever finishes)
  • OnError, which is called when an error occurs

In our OnNext method, we will just call Console.WriteLine to display the Event:

 

Step 5 - Implementing some queries

Now it is time to test some queries against the sample input. For simplicity, each query has been wrapped in a function with similar pattern. One input stream (or more than one) is passed as input, together with an Application object that describes the StreamInsight execution environment.

private static void myExample(CepStream<StockQuote> cepStream, Application application)

{

// Return only "Close" values using a where-clause

var newCepStream = ...

runQuery(newCepStream, application);

}

The query is instantiated and run in method runQuery(), which is same for all examples. Basically it calls the method ToQuery() to generate the instance, starts the query, goes into a loop that waits for the query to signal that it is finished and finally stops the query.

private static void runQuery(CepStream<StockQuote> cepStream, Application application)

{

// Configure output adapter

var outputConfig = new StockQuoteOutputConfig();

// Create query and bind to the output adapter

var query = cepStream.ToQuery(application, Guid.NewGuid().ToString(), "description", typeof(StockQuoteOutputFactory), outputConfig, EventShape.Point, StreamEventOrder.ChainOrdered);

// Start query

query.Start();

// Wait until query change state

DiagnosticView diagnosticView;

do

{

Thread.Sleep(100);

diagnosticView = query.Application.Server.GetDiagnosticView(query.Name);

} while ((string)diagnosticView[DiagnosticViewProperty.QueryState] == "Running");

// Stop query

query.Stop();

}

Filtering

Filtering of Events is done by a simple WHERE-clause. Below is an example of filtering out the closing prices (where FieldID equals to "Close").

private static void filterExample(CepStream<StockQuote> cepStream, Application application)

{

// Return only "Close" values using a where-clause

var filteredCepStream = from e in cepStream

where e.FieldID == "Close"

select e;

runQuery(filteredCepStream, application);

}

Cross join and projection

The next example shows how to combine input streams by using a cross join. A cross join combines each Event from the first stream with each Event from the second stream that is within same time frame. The syntax for a cross join is to write two JOIN-clauses.

In our first query, ericRecalcCepStream, we combine Ericsson stock price in US Dollars with Exchange Rate (from the same day) and recalculate the price into Swedish Krona. In our second query, ericCompareCepStream, we take the difference between the recalculated price and the Ericsson stock price in Swedish Krona. These kinds of calculations are examples of projections.

private static void crossJoinProjectionExample(CepStream<StockQuote> ericSEKStream, CepStream<StockQuote> ericUSDStream, CepStream<StockQuote> USDSEKStream, Application application)

{

var ericRecalcCepStream = from eUSD in ericUSDStream

from eXch in USDSEKStream

where eUSD.FieldID == "Close"

select new StockQuote()

{

StockID = "ERIC-Recalc",

FieldID = "Close",

Value = eUSD.Value * eXch.Value // Convert ERIC USD quote to SEK

};

var ericCompareCepStream = from eRecalc in ericRecalcCepStream

from eSEK in ericSEKStream

where eSEK.FieldID == "Close"

select new StockQuote()

{

StockID = "ERIC-Compare",

FieldID = "Diff",

Value = eSEK.Value - eRecalc.Value

};

runQuery(ericCompareCepStream, application);

}

Windowing and aggregation

This query example uses a hopping window. A hopping window represents a time span, in this case 7 days, and a "hop size" which is the amount of time the window moves in a step, in this case 1 day. The result is that you will get a window spanning from January 1st to January 7th. Next window will be from January 2nd to January 8th.

On the window we can run aggregate functions. This example takes the average price and minimum timestamp (meaning the first day of the window).

private static void avgExample(CepStream<StockQuote> cepStream, Application application)

{

var avgCepStream = from w in cepStream.Where(e => e.FieldID == "Close")

.HoppingWindow(TimeSpan.FromDays(7), TimeSpan.FromDays(1), HoppingWindowOutputPolicy.ClipToWindowEnd)

select new StockQuote()

{

StockID = "ERIC",

FieldID = "7-day avg",

Value = w.Avg(e => e.Value)

};

runQuery(avgCepStream, application);

}

Group and apply example

The previous query calculated the average only for the closing prices. By using a GROUP BY-clause, we can calculate the 7-day average for all available FieldIDs (Open, Close, Volume, etc) in a single query. Grouping is a very powerful operation that is useful when you monitor many similar sources.

private static void groupApplyExample(CepStream<StockQuote> ericUSDStream, Application application)

{

var ericUSDGroupCepStream = from e in ericUSDStream

group e by e.FieldID into eGroup

from w in eGroup.HoppingWindow(TimeSpan.FromDays(7), TimeSpan.FromDays(1), HoppingWindowOutputPolicy.ClipToWindowEnd)

select new StockQuote()

{

StockID = "ERIC 7-day avg",

FieldID = eGroup.Key,

Value = w.Avg(e => e.Value)

};

runQuery(ericUSDGroupCepStream, application);

}

Looking backwards in time

The final example query checks whether the stock price has dropped more than 10% in exactly 7 days. This is to demonstrate how you can use the ShiftEventTime function to compare an input stream with itself 7 days ago. A cross join is made between the original and the time-shifted stream to combine them in the calculation. Last in the query is a filter, that takes only the cases where the price has dropped by at least 10%.

private static void bigLooserExample(CepStream<StockQuote> cepStream, Application application)

{

var bigLooserCepStream = (from e1 in cepStream

from e2 in cepStream.ShiftEventTime(e => e.StartTime.AddDays(7))

where e1.FieldID == "Close" && e2.FieldID == "Close"

select new StockQuote()

{

StockID = "ERIC > 10% drop",

FieldID = "Close",

Value = (e1.Value - e2.Value) / e2.Value * 100

}).Where(e => e.Value < -10);

runQuery(bigLooserCepStream, application);

}

This example using time shifting is provided for educational purposes. In a real-world scenario, it would often be more interesting to detect stock prices that have dropped x% within 7 days. In that case you would use a window function instead of time shifting.

Step 6 - Tying it all together

Now that we have our input and output adapters as well as our queries, we can tie it all together. The easiest way is to run it within an in-process StreamInsight host. That can be created in the following way.

var server = Server.Create("si");

var application = server.CreateApplication("StockInsight");

Then we instantiate our adapters using the following code.

// Determine path for historical data

var dataPath = Path.GetDirectoryName(Environment.GetCommandLineArgs()[0]) + "\\HistoricalData\\";

// Configuration for input

var ericSEKConfig = new StockQuoteInputConfig

{

ID = "ERIC-SEK",

Filename = dataPath + "eric_b_sek_2009.csv",

ColumnNames = new string[] { "Open", "High", "Low", "Close", "Volume", "Adj Close" },

StartDate = new DateTime(2009, 01, 01),

Interval = 0

};

// Configuration for input

var ericUSDConfig = new StockQuoteInputConfig

{

ID = "ERIC-USD",

Filename = dataPath + "eric_b_usd_2009.csv",

ColumnNames = new string[] { "Open", "High", "Low", "Close", "Volume", "Adj Close" },

StartDate = new DateTime(2009, 01, 01),

Interval = 0

};

// Instantiate input adapters

var ericSEKStream = CepStream<StockQuote>.Create("ericSEKStream", typeof(StockQuoteInputFactory), ericSEKConfig, EventShape.Point);

var ericUSDStream = CepStream<StockQuote>.Create("ericUSDStream", typeof(StockQuoteInputFactory), ericUSDConfig, EventShape.Point);

Then we can run our examples by calling the appropriate methods.

// Run examples

filterExample(ericUSDStream, application);

crossJoinProjectionExample(ericSEKStream, ericUSDStream, USDSEKStream, application);

avgExample(ericUSDStream, application);

groupApplyExample(ericUSDStream, application);

bigLooserExample(ericUSDStream, application);

Source Code

The complete source code for this example is available on this link.

Note that there I have published also other samples. Download and open the whole solution in Visual Studio. The source code for this article is located in the project StockInsightImplicit.

Conclusion

This article describes how to develop a simple CEP application with StreamInsight for doing some simple queries against stock quotes. It describes how to build an input adapter that reads stock quotes from a file and an output adapter that displays the results on screen. Once you have the adapters, it is easy to experiment with your own queries.

A very useful feature within StreamInsight is the possibility to develop user-defined functions and aggregations. That will be described in a later article.

Resources

About the Author

Johan Åhlén is an independent Business Intelligence professional, all-round data enthusiast, inventor, musician, serial entrepreneur and information archeologist. He has coded since he was 9, around the time MS DOS 2.0 was released. Johan has been working with SQL Server since version 6.5 and is one of the leaders of the Swedish SQL Server User Group. Although his work is often in the borderland between technology and business (or psychology), he is eagerly following new technologies. Feel free to contact Johan through his blog.

The Series

This article is part of a series that is designed to help you quickly understand and begin using StreamInsight. You can view the entire series here.

This article is part of the parent stairway Stairway to StreamInsight

Rate

You rated this post out of 5. Change rating

Share

Share

Rate

You rated this post out of 5. Change rating