Click here to monitor SSC
SQLServerCentral is supported by Red Gate Software Ltd.
 
Log in  ::  Register  ::  Not logged in
 
 
 

StreamInsight – the development models

By Johan Åhlén,

Editor: I'm leaving this in reviwed status. Update when you can.

A few comments inline: Avoid bolding so much. It lessens the value of the times you use it. Really you should not be doing it more than once or twice in an article. Use it for extreme emphasis. I think that the comments in the iObservable section apply to all three. You need to better set up the model and give reasons why it's used, the pros/cons or more info about what it implies to use this model. Then go into more details like the references, payload, sample.

Introduction

This article is the third in a series of articles describing Microsoft SQL Server 2008 R2 StreamInsight. The first articles have given a background on StreamInsight/Complex Event Processing and how to get started developing a simple application. This article describes the fundamentals of the three different development models in StreamInsight - how they work and scenarios where they should be used.

The entire series is here: Stairway to StreamInsight

Background

In StreamInsight there are different deployment options and development models. The three different development models support different feature sets and deployment options as seen in the table below.

Editor: I think you want to re-order these in the order you are talking about them. Also, is there more information about the implication so these deployment options or features we can give the person here?

Development models overview
Development Model Deployment Options Features
Explicit In-process or hosted Full featured model.
Implicit In-process only Good for debugging and testing adapters.
IObservable/IObserver In-process or hosted Limited access to API, but easier integration with existing .NET applications.

You can deploy your application in-process, which means that a StreamInsight server is instantiated embedded within your application. Another deployment option is to run your applications within the StreamInsight Host, which is Windows Service that comes with the StreamInsight installation.

Editor: What does this mean for someone? Does it help them make a decision? It seems like a fact listed that isn't providing much info

The IObservable/IObserver development model

The IObservable/IObserver is the easiest development model. It is based on the IObservable/IObserver design pattern, which is a general design pattern where observers are objects that want to be notified when there are any important state changes in the observable objects. In StreamInsight, the observables are event sources and observers are event sinks.

Editor: It feels like you're jumping into some details here without a flow. It's not necessarily clear here what all these sections under this development model mean. I think you need to step back and say "if someone read part 2, does this lead them along?" I'm not sure how the references, payload, relate here. It's a question of flow here rather than info. I'm not sure why I'd use this.

References

When using the IObservable/IObserver development model you should reference the following DLLs:

  • Microsoft.ComplexEventProcessing.dll
  • Microsoft.ComplexEventProcessing.Adapters.Observable.dll

You should add the following using-statements in your C# files:

  • using Microsoft.ComplexEventProcessing
  • using Microsoft.ComplexEventProcessing.Linq
  • using Microsoft.ComplexEventProcessing.Adapters.Observable

Defining the payload

Time information, such as start time and end time, is usually handled by the event headers. In the IObservable interface however you will not have direct access to the event headers. Therefore you need to pass along the time information in the payload if you want to make use of it.

Example payload:

public class StockQuote
{
public DateTime TimeStamp { get; set; }
public string StockID { get; set; }
public string FieldID { get; set; }
public double Value { get; set; }
}

Creating the observable

There are two options how to create the observable. Either you can implement the IObservable interface or you can use an IEnumerable and convert it to an IObservable.

The IObservable interface has only one method to implement: Subscribe. The Subscribe method is called with one argument, an observer, and should typically add the observer to a list of observers that are passed any events from the observable. The return value is an optional IDisposable that is responsible for cleaning up any resources that have been allocated. The IObservable interface can be used for both push and pull based scenarios.

Below is an example of implementing the IObservable interface:

Editor: I think you want another sentence or two that tells what this example is. Or where it's from (link)

public class Observable : IObservable<StockQuote>
{

private List<IObserver<StockQuote>> observers = new List<IObserver<StockQuote>>();

public IDisposable Subscribe(IObserver<StockQuote> observer)
{
observers.Add(observer);
return null;
}

public void ProduceEvents()
{
while (events available...)
{
StockQuote e = read next event...
foreach (var observer in observers)
observer.OnNext(e);
}

foreach (var observer in observers)
observer.OnCompleted();
}
}

Below is an example of using an IEnumerable instead of implementing the IObservable interface:

StockQuote[] quotes = ...;
IObservable<StockQuote> observable = quotes.ToObservable();

Creating the observer

The observer implements the interface IObserver, which has three methods: OnNext, OnCompleted and OnError. The OnNext method is called for each event. OnCompleted is called to signal the end of stream. OnError is called upon errors with information about the exception.

Editor: Ditto here, what does this mean? When I read http://www.sqlservercentral.com/articles/StreamInsight/69593/, I'm not sure where this fits in.

Below is an example of implementing an observer:

public class Observer : IObserver<StockQuote>
{

public void OnCompleted()
{
// TODO: Add any code here to clean up resources
}

public void OnError(Exception error)
{
throw error;
}

public void OnNext(StockQuote value)
{
Console.WriteLine("Output: " +
value.TimeStamp + " " +
value.StockID + " " +
value.FieldID + " " +
value.Value.ToString("f2"));
}
}

Creating the queries

To be able to do queries on your data you need to convert from IObservable to CepStream. That is accomplished by calling the extension method ToCepStream, as in the example below.

Editor: Why? Short explanation of the conversion

CepStream<StockQuote> stream = observable.ToCepStream<StockQuote>(e => new DateTimeOffset(e.TimeStamp, TimeSpan.Zero));

The ToCepStream method adds an event header to your data and therefore needs the time information. You have a few different versions of the ToCepStream method depending on if it should be a point event or interval event and how you want CTI events to be generated (CTI events were explained in the previous article in this series).

Editor: If this adds the event header, do you need the payload section above?

The query is written in LINQ in the same way regardless of the development model. Therefore we won't go into any details here. There are some query examples in the earlier article, "Getting started with StreamInsight". Let's just assume it is called "myquery". Simplest possible query would look like this.

var myquery = from e in stream
select e;

After having defined your query, you need to convert it into an IObservable and then attach your observer to it. Below is an example.

var observer = new Observer();
var disposable = myquery.ToObservable<StockQuote>().Subscribe(observer);

Monitoring the query

The query will start automatically once you add the observer. It will keep running until the observable signals that it has completed.

In the IObservable/IObserver model you do not have access to the StreamInsight Diagnostic Views like you have in the other development models. Instead you have the methods OnCompleted and OnError where you can put code to pass on the information to the rest of your program.

Editor: I think you need to link to somewhere for Diagnostic views. Or mention they are covered later.

Usage scenarios

In the IObservable/IObserver model the programmer is exposed to a minimum of the StreamInsight API. This model also has the advantage that existing classes, such as a User Control or a Window, can easily be made into an observer by subclassing them and implementing the IObserver interface.

EDitor: I think this needs to be moved up, or integrated earliter (see previous comments)

The example here describes how to run your queries in-process. It is also possible in the IObservable/IObserver model to run your queries in an external process by following the Explicit development model and using the classes and extension methods under Microsoft.ComplexEventProcessing.Adapters.Observable to interface with your observables/observers. That way you could mix adapters of different development models.

Editor: Why in v out of process?

The Implicit development model

The implicit model has the same adapter structure as the explicit model, but the query control is simplified and limited compared to the explicit model. It automatically instantiates the StreamInsight runtime environment within the current process.

Editor: The comparison to external doesn't make much sense when you haven't talked about external yet

It is possible in the implicit development model to get access to the in-memory server and application through the query object. Therefore this development is suitable for development and debugging of StreamInsight applications.

References

  • When using the implicit development model you should reference the following DLLs:
  • Microsoft.ComplexEventProcessing.dll
  • Microsoft.ComplexEventProcessing.Adapters.dll
  • Microsoft.ComplexEventProcessing.Diagnostics.dll

You should add the following using-statements in your C# files:

  • using Microsoft.ComplexEventProcessing
  • using Microsoft.ComplexEventProcessing.Linq
  • using Microsoft.ComplexEventProcessing.Adapters
  • using Microsoft.ComplexEventProcessing.Diagnostics

Defining the payload

The payload for the implicit model looks the same as for the IObservable/IObserver model except that you should not put any time information in there that is intended for the event header.

Payload example:

public class StockQuote
{

public string StockID { get; set; }
public string FieldID { get; set; }
public double Value { get; set; }
}

Creating the input adapter

An input adapter is made up of a minimum of three classes:

  • Configuration class
  • Factory class
  • Adapter class

The configuration class contains specific information for your input adapter instance. It is mandatory, but it can be an empty class with no members if you wish no configuration possibilities. The configuration class does not have to implement any interface.

The factory class is responsible for creating instances of the adapter class. You may have an adapter that works on point, interval and edge events. Then it is the responsibility of the factory class to instantiate the correct adapter class. The factory class has to implement any one of the factory interfaces: ITypedInputAdapterFactory or IInputAdapterFactory (for untyped input adapters). These interfaces have one method, Create, that should create an instance of the adapter class. An example is shown below.

public class StockQuoteInputFactory : ITypedInputAdapterFactory<StockQuoteInputConfig>
{

public InputAdapterBase Create<TPayload>(StockQuoteInputConfig config, EventShape eventShape)
{
// Only support the point event model
if (eventShape == EventShape.Point)
return new StockQuoteTypedPointInput(config);
else
return default(InputAdapterBase);
}

public void Dispose()
{
}
}

The adapter class is responsible for retrieving events and enqueing them in the stream. They can be built for either push or pull based operations. The adapter class has to inherit from any of the base classes, such as TypedPointInputAdapter or EdgeInputAdapter. They have three methods you should to override: Start, Resume and Dispose. Also you need to implement a constructor that takes your configuration class as parameter. Usually Start and Resume should bring the adapter into a loop where it checks the AdapterState property and enqueues events by calling CreateInsertEvent, Enqueue and EnqueueCtiEvent. When AdapterState changes from running to stopping, you will need to clean up resources and call Stopped. An example is shown below.

public class StockQuoteTypedPointInput : TypedPointInputAdapter<StockQuote>
{

private PointEvent<StockQuote> pendingEvent;

public StockQuoteTypedPointInput(StockQuoteInputConfig config)
{
}

public override void Start()
{
ProduceEvents();
}

public override void Resume()
{
ProduceEvents();
}

protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
}

private void ProduceEvents()
{
var currEvent = default(PointEvent<StockQuote>);

EnqueueCtiEvent(DateTimeOffset.Now);
try
{
while (AdapterState != AdapterState.Stopping)
{
if (pendingEvent != null)
{
currEvent = pendingEvent;
pendingEvent = null;
}
else
{
currEvent = CreateInsertEvent();

// TODO: Add code here to populate your event

var result = Enqueue(ref currEvent);
if (result == EnqueueOperationResult.Full)
{
PrepareToResume(currEvent);
Ready();
return;
}

EnqueueCtiEvent(DateTimeOffset.Now);
// TODO: Should check the return value of EnqueueCtiEvent also
}
}

if (pendingEvent != null)
{
currEvent = pendingEvent;
pendingEvent = null;
}

PrepareToStop(currEvent);
Stopped();
}
catch (AdapterException e)
{
// Error handling should go here
}
}

private void PrepareToStop(PointEvent<StockQuote> currEvent)
{
if (currEvent != null)
{
ReleaseEvent(ref currEvent);
}
}

private void PrepareToResume(PointEvent<StockQuote> currEvent)
{
pendingEvent = currEvent;
}
}

Creating the output adapter

The design pattern of an output adapter is much similar to that of an input adapter. It is made up of the same minimum set of classes:

  • Configuration class
  • Factory class
  • Adapter class

The factory class implements ITypedOutputAdapterFactory or IOutputAdapterFactory depending on if it is typed or untyped. The adapter class inherits from TypedPointOutputAdapter (or a similar) base class. The pattern is similar to the input adapter except that you keep receiving events by calling Dequeue. An example of main loop in the output adapter is shown below.

while (AdapterState != AdapterState.Stopping)
{

result = Dequeue(out currEvent);

if (result == DequeueOperationResult.Empty)
{
Ready();
return;
}
else
{
// Handle your event

ReleaseEvent(ref currEvent);
}
}

Creating the queries

As a first step you need to instantiate your input adapters. That is done in the implicit development model by calling CepStream.Create. The CepStream is a generic class where you need to supply the type of your payload. As arguments to the method call you need to supply a name for your input stream, the type of the factory class, any configuration information, and the event shape (point, interval or edge). An example is shown below.

var inputStream = CepStream<StockQuote>.Create("input1", typeof(StockQuoteInputFactory), inputConfig, EventShape.Point);

The query is written in LINQ in the same way regardless of the development model. Therefore we won't go into any details here. There are some query examples in the earlier article, "Getting started with StreamInsight". Let's just assume it is called "output". Simplest possible query would look like this.

var output = from e in input
select e;

After having defined your query, you need to instantiate it and connect to an output adapter. That is done by calling the extension method ToQuery. Below is an example.

var query = output.ToQuery(typeof(StockQuoteOutputFactory), outputConfig, EventShape.Point, StreamEventOrder.ChainOrdered);

The arguments supplied to the ToQuery method include the type of the output adapter factory, the configuration, the event shape and a parameter describing the event ordering requirements. The event order can be fully ordered or chain ordered (which only guarantees that related events are sorted, such as matching EdgeStart and EdgeEnd).

Starting and monitoring the query

The query is started by calling the Start method. The query state can then be monitored using a diagnostic view. Below is an example of starting a query, waiting until it completes and stopping. In many examples, however queries run for an infinite time.

query.Start();

DiagnosticView diagnosticView;
do
{

Thread.Sleep(100);
diagnosticView = query.Application.Server.GetDiagnosticView(query.Name);
} while ((string)diagnosticView["QueryState"] == "Running");

query.Stop();

The Query State is one example of the metrics available through diagnostic views. There are several other metrics available for performance monitoring and statistics, which are listed in the StreamInsight documentation. Also you can get diagnostic for different objects such as adapters and operators.

Usage scenarios

The implicit development model is especially good for developing and debugging adapters. Since the metadata is not persisted on disk, you can always rerun your applications from the beginning. Also you can monitor your adapters through breakpoints and enable StreamInsight tracing to write detailed information to a log file that can be reviewed using the StreamInsight Debugger.

The Explicit development model

The explicit development model gives you full control of your StreamInsight applications and full access to the StreamInsight API. It allows you to run your applications inside the StreamInsightHost and supports the reuse of queries and adapters. When running StreamInsight applications inside the StreamInsightHost you can monitor them using the StreamInsight Debugger.

References

When using the explicit development model you should reference the following DLLs:

  • Microsoft.ComplexEventProcessing.dll
  • Microsoft.ComplexEventProcessing.Adapters.dll
  • Microsoft.ComplexEventProcessing.Diagnostics.dll

You should add the following using-statements in your C# files:

  • using Microsoft.ComplexEventProcessing
  • using Microsoft.ComplexEventProcessing.Linq
  • using Microsoft.ComplexEventProcessing.Adapters
  • using Microsoft.ComplexEventProcessing.Diagnostics

Defining the payload

See the description under the Implicit development model. There is no difference in the development models in terms of the payload.

Connecting to a server / instantiating a server

External server

You can connect to a StreamInsight server using the Server.Connect method. It takes as an argument the URI of the server, which could for example be "http://localhost/StreamInsight". The URI is determined by the host you are running (for example StreamInsightHost.exe). Below is an example.

var server = Server.Connect(new EndpointAddress("http://localhost/StreamInsight"));

In a host like StreamInsightHost.exe, applications are persisted to a local SQL Server CE database. You can create an application or use an existing one. Below is a sample of how you can check if application already exists and create a new if necessary.

Application application;

if (server.Applications.ContainsKey("StockInsight"))
application = server.Applications["StockInsight"];
else

application = server.CreateApplication("StockInsight");

In-process server

You create your own in-process StreamInsight server using the Server.Create method. Then you will need to create the application object. Metadata will not be persisted unless you supply a metadata provider, so you will always have to create the objects from scratch. Below is an example.

var server = Server.Create();
var application = server.CreateApplication("StockInsight");

Creating the input adapter

There's no difference in input adapters between the Explicit and Implicit development models, so we will not repeat again how to develop them. However the necessary steps to use them are different.

If you run everything in-process you just need to have the classes included in your Visual Studio project. Otherwise if you want to run in an external process, you need to supply the host process (for example StreamInsightHost.exe) with your assembly containing the input adapter classes and any assemblies it depends on (in case they are not in the global assembly cache). This is accomplished by copying them into the same directory as StreamInsightHost.exe.

The input adapter needs to be registered in StreamInsight. That is done by calling the method CreateInputAdapter of your application object and providing a name and description as arguments. An example is shown below.

InputAdapter inputAdapter = application.CreateInputAdapter<StockQuoteInputFactory>("StockQuoteInput", "Description...");

Your input adapter is then ready to be instantiated and bound to a query. How to do that will be described a little later in this article.

Creating the output adapter

There's no difference in output adapters between the Explicit and Implicit development models, so we will not repeat again how to develop them. However the necessary steps to use them are different.

Similarly to the input adapters, if you want to run your output adapters in an external host you need to supply that host with your assembly (by copying your DLL-file into the same directory as StreamInsightHost.exe if you use that as your host).

Output adapters are registered by calling the method CreateOutputAdapter as in the example below.

OutputAdapter outputAdapter = application.CreateOutputAdapter<StockQuoteOutputFactory>("StockQuoteOutput", "Description...");

Creating the query template

Query templates are unbound queries. They are expressed using the same LINQ syntax as for the other development models, but using named placeholders for the input streams.

The placeholder input streams are created as in the example below.

var input1 = CepStream<StockQuote>.Create("input1");

The query is then written using the placeholders and can be registered as a query template like in the example below.

var output = from e in input1
where e.FieldID == "Close"
select e;

var queryTemplate = application.CreateQueryTemplate("myTemplate", output);

Creating the query instance

A query instance is created by binding together a query template with input adapters and an output adapter. That is easiest done by using the QueryBinder class. An example is shown below how to create a QueryBinder that is attached to your query template.

var queryBinder = new QueryBinder(queryTemplate);

The query binder needs to bind each named placeholder to an input adapter of the same type. That is done by calling the method BindProducer as in the example below.

queryBinder.BindProducer<StockQuote>("input1", application.InputAdapters["StockQuoteInput"], new StockQuoteInputConfig(), EventShape.Point);

In a similar way the output adapter is bound by calling the method AddConsumer as shown below.

queryBinder.AddConsumer<StockQuote>("output", application.OutputAdapters["StockQuoteOutput"], new StockQuoteOutputConfig(), EventShape.Point, StreamEventOrder.ChainOrdered);

Finally when all the loose ends of the query template have been bound, the query instance is created by calling the method CreateQuery.

var query = application.CreateQuery("myQuery", queryBinder, "Description...");

Starting and monitoring the query instance

The query instance is started and monitored in the same way as in the Implicit development model, regardless if it runs in-process or externally. The StreamInsight API will take care of interfacing with your environment.

Below is an example of starting a query, waiting until it completes and stopping. In many examples, however queries run for an infinite time.

query.Start();

DiagnosticView diagnosticView;
do
{

Thread.Sleep(100);
diagnosticView = query.Application.Server.GetDiagnosticView(query.Name);
} while ((string)diagnosticView["QueryState"] == "Running");

query.Stop();

The Query State is one example of the metrics available through diagnostic views. There are several other metrics available for performance monitoring and statistics, which are listed in the StreamInsight documentation. Also you can get diagnostic for different objects such as adapters and operators.

Usage scenarios

The explicit development model is the preferred method in most scenarios because it supports features such as reusability of objects that are not possible in the other development models. Also it allows your applications to be run within the StreamInsight Host.

A typical scenario is to develop adapters using the implicit development model and then use them in a full scale application using the explicit development model.

Conclusion

The three different development models in StreamInsight have their advantages and disadvantages. Basically the main choice is whether to use the IObservable/IObserver pattern or work directly with the StreamInsight API. The choice depends on what kind of application is being developed and whether to prioritize easy integration with other .NET objects or detailed control of the behavior.

The next article will go more into depth about StreamInsight queries and how to develop user-defined functions and aggregations.

Resources

About the Author

Johan Åhlén is an independent Business Intelligence professional, all-round data enthusiast, inventor, musician, serial entrepreneur and information archeologist. He has coded since he was 9, around the time MS DOS 2.0 was released. Johan has been working with SQL Server since version 6.5 and is one of the leaders of the Swedish SQL Server User Group. Although his work is often in the borderland between technology and business (or psychology), he is eagerly following new technologies. Feel free to contact Johan through his blog.

 

The Series

This article is part of a series that is designed to help you quickly understand and begin using StreamInsight. You can view the entire series here.

Total article views: 13 | Views in the last 30 days: 1
 
 
Contribute

Join the most active online SQL Server Community

SQL knowledge, delivered daily, free:

Email address:  

You make SSC a better place

As a member of SQLServerCentral, you get free access to loads of fresh content: thousands of articles and SQL scripts, a library of free eBooks, a weekly database news roundup, a great Q & A platform… And it’s our huge, buzzing community of SQL Server Professionals that makes it such a success.

Join us!

Steve Jones
Editor, SQLServerCentral.com

Already a member? Jump in:

Email address:   Password:   Remember me: Forgotten your password?
Steve Jones