SQL Clone
SQLServerCentral is supported by Redgate
 
Log in  ::  Register  ::  Not logged in
 
 
 

Generate a SSIS-package that reads from Twitter using Biml

By Johan Åhlén,

In an earlier article, I've described how to autogenerate SSIS file import packages using Biml. It is also possible to read data from more advanced data sources by generating a Script component. While you may not have much use for autogenerating SSIS-packages that read from Twitter, it serves as a good example and it is very useful to be able to autogenerate any SSIS-packages with a Script component.

This article requires that you use either BIDS Helper (a free add-on for Visual Studio) or MIST (a commercial product from Varigence, the company behind Biml).

The goal of this article

Our goal is to generate a SSIS-package that looks like this:

The SSIS-package should:

  1. Truncate a database table
  2. Read from Twitter
  3. Write to the database table

Again, the main purpose of this article is to show how to create a Script component with Biml. Twitter is just used as an example.

Twitter API

Twitter has several API:s, of which one of the most useful is the Search API. This API is open for anyone to use (but requires an account registration). It's also relatively simple.

Authentication

Twitter uses OAuth for authentication. There are some different options. I've chosen to use Application-Only authentication, but you could easily authenticate as a user instead with small changes to my example.

The authentication requires that you have a registered application. It's very simple and free to register here. By registering you will get a Consumer Key and a Consumer Secret.

To authenticate you send your Consumer Key and Consumer Secret using the OAuth protocol and you will then receive a Bearer Token. This Bearer Token must then be included in all subsequent calls to the Search API.

Twitter Search API

It's very simple to use the Search API. Just send your search parameters as an URL to the Twitter server and you will get back the search results in JSON format. For example:

https://api.twitter.com/1.1/search/tweets.json?q=%23mssql

will search for tweets tagged with #mssql (note that %23 means #).

When calling the Search API, don't forget to add the authentication header. This can easily be done with one line of code:

request.Headers.Add("Authorization", "Bearer " + token);

The search results would look something like this:

"{\"statuses\":[{\"created_at\":\"Tue Apr 19 05:06:27 +0000 2016\",\"id\":722290241235730432,\"id_str\":\"722290241235730432\",\"text\":\"Build ISA QA Trade 2015.01_20160419.1 succeeded for New Tech in 186 minutes #TFS #ContinuosIntegration #MSSQL #SSIS\",...

JSON parser

I wrote my own simple JSON parser. It's not intended for production usage, so if you are going to run this in a production environment you should look at other options.

Generating the SSIS-package

We will now continue this walkthrough by:

  1. Create the database table
  2. Create the SSIS project (in BIDS Helper/Visual Studio) or a project in MIST
  3. Add the Biml to your project
  4. Generate the SSIS-package

Create the database table

I have named my database JohanBimlDemo. You can of course rename it - just don't forget to change in all places in the Biml.

Here's the SQL code to generate the table:

USE JohanBimlDemo
GO
CREATE TABLE [dbo].[Twitter]
(
[IdStr] [nvarchar](50) NOT NULL PRIMARY KEY CLUSTERED,
[Text] [nvarchar](250) NOT NULL,
[Username] [nvarchar](50) NOT NULL,
[CreatedAt] [nvarchar](50) NOT NULL
);

Create a project

If you are using BIDS Helper, just create an SSIS-project as usual in Visual Studio: New Project... -> Business Intelligence -> Integration Services Project.

If you instead are using MIST, just create a project there.

Biml

The complete Biml source code is pretty long, so I've added it to the end of this article (and also attached it as a file).

To add it in BIDS Helper / Visual Studio, create a new Biml file and paste the Biml source code into it:

Generate the SSIS-package

Before you can read from the Twitter Search API, you will have to enter your authentication information. Find these rows in the Biml source code and enter your own data:

<Variable Name="ConsumerKey" DataType="String" >Enter your Consumer Key here</Variable>
<Variable Name="ConsumerSecret" DataType="String">Enter your Consumer Secret here</Variable>
<Variable Name="TwitterQuery" DataType="String">/1.1/search/tweets.json?q=%23mssql</Variable>

If you use BIDS Helper, you generate the SSIS-package by right-clicking on your Biml-file and selecting "Generate SSIS Packages".

In MIST you just select Build.

Then your SSIS-package is ready to test!

How it works

Now, this is the most important part: how the ScriptComponent works.

First in the source code there is a Connection and some Variables. Then there is a Tasks-element which starts with an ExecuteSQL to do the truncation of the table. After that the most important part begins, under the Dataflow.

ScriptComponentSource

A ScriptComponentSource uses a ScriptComponentProject to define the actual script.

The ScriptComponentProject can be defined either "globally" (shared between ScriptComponentSources) or inline directly under a ScriptComponentSource. I have chosen the "inline" option.

The ScriptComponentProject is probably best explained by looking at my example.

  • Note that I have added a ScriptComponentProject under another ScriptComponentProject. This is required in Biml and I have no good explanation why.
  • It's important that you add the right Assembly References. My example shows which ones you should include by minimum.
  • You have to add at least one OutputBuffer, where the ScriptComponentSource delivers its output. I have included four fields from Twitter, but you can of course easily add more.
  • You need a ReadOnlyVariables (or ReadWriteVariables) to make your variables available inside your script.
  • Then comes my source files. Normally you create two source files: AssemblyInfo.cs (containing some metadata) and ScriptMain.cs (containing your actual source code)

The most important function in ScriptMain is CreateNewOutputRows. Here is how it authenticates and fetches the search results:

var accessToken = getAccessToken(consumerKey, consumerSecret);
var jsontext = callSearchApi(queryString, accessToken);
var jObj = new JObject(jsontext);

The search results are written to the OutputBuffer this way:

foreach (JObject status in (JArray)jObj["statuses"])
{
// Reading a few fields. More Twitter fields can of course be read the same way.
var id_str = status["id_str"].ToString();
var text = status["text"].ToString();
var user = (JObject)status["user"];
var username = user["name"].ToString();
var created_at = status["created_at"].ToString();

// Send to output buffer
Output0Buffer.AddRow();
Output0Buffer.IdStr = id_str;
Output0Buffer.Text = text;
Output0Buffer.Username = username;
Output0Buffer.CreatedAt = created_at;
}

Output0Buffer.SetEndOfRowset();

OleDbDestination

Nothing special about this dataflow component. We just specify the ConnectionName, JohanBimlDemo, and the destination table, dbo.Twitter, as ExternalTableOutput.

Biml source code

<Biml xmlns="http://schemas.varigence.com/biml.xsd">
 <Annotations>
 <Annotation>
 TwitterScriptTaskDemo.biml
 Biml Demo using script task for reading from Twitter
 Written by Johan Åhlén 2016
 </Annotation>
 <Annotation>
 Table script:
 CREATE TABLE [dbo].[Twitter]
 (
 [IdStr] [nvarchar](50) NOT NULL PRIMARY KEY CLUSTERED,
 [Text] [nvarchar](250) NOT NULL,
 [Username] [nvarchar](50) NOT NULL,
 [CreatedAt] [nvarchar](50) NOT NULL
 );
 </Annotation>
 </Annotations>
 <Connections>
 <Connection Name="JohanBimlDemoConnection" ConnectionString="Provider=SQLNCLI11;Data Source=localhost;Integrated Security=SSPI;Initial Catalog=JohanBimlDemo" />
 </Connections>
 <Packages>
 <Package Name="TwitterBimlDemo" ConstraintMode="Linear" ProtectionLevel="EncryptSensitiveWithUserKey">
 <Variables>
 <!-- Enter your Twitter API keys here... -->
 <Variable Name="ConsumerKey" DataType="String" >Enter your Consumer Key</Variable>
 <Variable Name="ConsumerSecret" DataType="String">Enter your Consumer Secret</Variable>
 <Variable Name="TwitterQuery" DataType="String">/1.1/search/tweets.json?q=%23mssql</Variable>
 </Variables>
 <Tasks>
 <ExecuteSQL Name ="Truncate" ConnectionName="JohanBimlDemoConnection" ResultSet="None">
 <DirectInput>TRUNCATE TABLE [dbo].[Twitter]</DirectInput>
 </ExecuteSQL>
 <Dataflow Name ="Read from Twitter">
 <Transformations>
 <ScriptComponentSource Name="Twitter">
 <ScriptComponentProject>
 <ScriptComponentProject ProjectCoreName="TwitterScript" Name="TwitterScript">
 <AssemblyReferences>
 <AssemblyReference AssemblyPath="System" />
 <AssemblyReference AssemblyPath="System.Data" />
 <AssemblyReference AssemblyPath="System.Windows.Forms" />
 <AssemblyReference AssemblyPath="System.Xml" />
 <AssemblyReference AssemblyPath="Microsoft.SqlServer.DTSPipelineWrap" />
 <AssemblyReference AssemblyPath="Microsoft.SqlServer.DTSRuntimeWrap" />
 <AssemblyReference AssemblyPath="Microsoft.SqlServer.PipelineHost" />
 <AssemblyReference AssemblyPath="Microsoft.SqlServer.TxScript" />
 </AssemblyReferences>
 <OutputBuffers>
 <OutputBuffer Name="Output0" IsSynchronous="false">
 <Columns>
 <Column Name="IdStr" DataType ="String" Length="50"></Column>
 <Column Name="Text" DataType ="String" Length="250"></Column>
 <Column Name="Username" DataType ="String" Length="50"></Column>
 <Column Name="CreatedAt" DataType ="String" Length="50"></Column>
 </Columns>
 </OutputBuffer>
 </OutputBuffers>
 <ReadOnlyVariables>
 <Variable VariableName="ConsumerKey" DataType="String" Namespace="User"/>
 <Variable VariableName="ConsumerSecret" DataType="String" Namespace="User"/>
 <Variable VariableName="TwitterQuery" DataType="String" Namespace="User"/> 
 </ReadOnlyVariables>
 <Files>
 <File Path="AssemblyInfo.cs">
 using System.Reflection;
 using System.Runtime.CompilerServices;
 //
 // General Information about an assembly is controlled through the following
 // set of attributes. Change these attribute values to modify the information
 // associated with an assembly.
 //
 [assembly: AssemblyTitle("TwitterScript.csproj")]
 [assembly: AssemblyDescription("")]
 [assembly: AssemblyConfiguration("")]
 [assembly: AssemblyCompany("")]
 [assembly: AssemblyProduct("TwitterScript.csproj")]
 [assembly: AssemblyCopyright("Copyright @ Johan Åhlén 2016")]
 [assembly: AssemblyTrademark("")]
 [assembly: AssemblyCulture("")]
 //
 // Version information for an assembly consists of the following four values:
 //
 // Major Version
 // Minor Version
 // Build Number
 // Revision
 //
 // You can specify all the values or you can default the Revision and Build Numbers
 // by using the '*' as shown below:
 [assembly: AssemblyVersion("1.0.*")]
 </File>
 <File Path="ScriptMain.cs">
 using System;
 using System.Data;
 using System.Text;
 using System.Text.RegularExpressions;
 using System.Net;
 using System.Collections.Generic;
 using System.IO;
 using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
 using Microsoft.SqlServer.Dts.Runtime.Wrapper;

 [Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
 public class ScriptMain : UserComponent
 {
 public override void CreateNewOutputRows()
 {
 var consumerKey = Variables.ConsumerKey;
 var consumerSecret = Variables.ConsumerSecret;
 var queryString = Variables.TwitterQuery;

 var accessToken = getAccessToken(consumerKey, consumerSecret);
 var jsontext = callSearchApi(queryString, accessToken);
 var jObj = new JObject(jsontext);

 foreach (JObject status in (JArray)jObj["statuses"])
 {
 // Reading a few fields. More Twitter fields can of course be read the same way.
 var id_str = status["id_str"].ToString();
 var text = status["text"].ToString();
 var user = (JObject)status["user"];
 var username = user["name"].ToString();
 var created_at = status["created_at"].ToString();

 // Send to output buffer
 Output0Buffer.AddRow();
 Output0Buffer.IdStr = id_str;
 Output0Buffer.Text = text;
 Output0Buffer.Username = username;
 Output0Buffer.CreatedAt = created_at;
 }

 Output0Buffer.SetEndOfRowset();
 }

 private string callSearchApi(string queryString, string accessToken)
 {
 // Prepare request
 var searchRequest = WebRequest.Create("https://api.twitter.com" + queryString);
 searchRequest.Method = "GET";
 searchRequest.UseDefaultCredentials = false;
 searchRequest.Headers.Add("Authorization", "Bearer " + accessToken);

 // Get the response
 var searchResponse = searchRequest.GetResponse();

 // Parse the response
 using (var responseStream = searchResponse.GetResponseStream())
 {
 var reader = new StreamReader(responseStream);
 var responseBody = reader.ReadToEnd();
 responseStream.Close();

 return responseBody;
 }
 }

 private string getAccessToken(string consumerKey, string consumerSecret)
 {
 // Prepare request
 var oauthString = Convert.ToBase64String(Encoding.ASCII.GetBytes(consumerKey + ":" + consumerSecret));
 var oauthRequest = WebRequest.Create("https://api.twitter.com/oauth2/token");
 oauthRequest.Method = "POST";
 oauthRequest.UseDefaultCredentials = false;
 oauthRequest.Headers.Add("Authorization", "Basic " + oauthString);
 oauthRequest.ContentType = "application/x-www-form-urlencoded;charset=UTF-8";
 var postContent = Encoding.ASCII.GetBytes("grant_type=client_credentials");
 oauthRequest.ContentLength = postContent.Length;
 using (var requestStream = oauthRequest.GetRequestStream())
 {
 requestStream.Write(postContent, 0, postContent.Length);
 requestStream.Close();
 }

 // Get the response
 var oauthResponse = oauthRequest.GetResponse();

 // Parse the response
 using (var responseStream = oauthResponse.GetResponseStream())
 {
 var reader = new StreamReader(responseStream);
 var responseBody = reader.ReadToEnd();
 responseStream.Close();

 var regex = new Regex("\"access_token\":\"(.+?)\"");
 var match = regex.Match(responseBody);
 var access_token = match.Groups[1].Value;

 return access_token;
 }
 }
 }
 
 // Using a very simple json implementation. If used for production, it should be replaced with a better json library.
 public class JArray : List&lt;object&gt;
 {
 internal void Parse(string jsontext, ref int i)
 {
 i = JToken.LookFor(jsontext, i, '[', true);
 i++;

 // Loop through members
 do
 {
 // Check for end of array
 if (jsontext[i] == ']')
 {
 i++;
 return;
 }

 // Read object
 var obj = JToken.Parse(jsontext, ref i);

 this.Add(obj);

 // Skip whitespace
 i = JToken.SkipWhitespace(jsontext, i);

 // Check if end of array or another member
 var ch = jsontext[i++];
 if (ch == ']')
 break;
 else if (ch == ',')
 continue;
 else
 throw new Exception("JArray.Parse: expected ] or ,");
 }
 while (true);
 }

 public JArray(string srctext)
 : base()
 {
 var i = 0;
 Parse(srctext, ref i);
 }

 internal JArray(string jsontext, ref int i)
 : base()
 {
 Parse(jsontext, ref i);
 }
 }

 public static class JToken
 {
 internal static int LookFor(string jsontext, int startpos, char searchCh, bool skipWhitespace = false)
 {
 var i = jsontext.IndexOf(searchCh, skipWhitespace ? SkipWhitespace(jsontext, startpos) : startpos);
 if (i == -1)
 throw new Exception("JToken.LookFor: can't find character " + searchCh);

 return i;
 }

 internal static int LookForUnescaped(string jsontext, int startpos, char searchCh)
 {
 var i = startpos;
 while (i &lt; jsontext.Length)
 {
 if (jsontext[i] == searchCh)
 {
 if (i == 0 || jsontext[i - 1] != '\\')
 return i;
 }

 i++;
 }

 throw new Exception("JToken.LookForUnescaped: can't find character " + searchCh);
 }

 internal static int SkipWhitespace(string jsontext, int startpos)
 {
 for (var i = startpos; i &lt; jsontext.Length; i++)
 if (!char.IsWhiteSpace(jsontext[i]))
 return i;

 throw new Exception("JToken.SkipWhitespace: end of string reached");
 }


 public static object Parse(string jsontext, ref int i)
 {
 i = SkipWhitespace(jsontext, i);

 if (jsontext[i] == '"')
 return ReadStr(jsontext, ref i);
 else if (jsontext[i] == '{')
 return ReadJObject(jsontext, ref i);
 else if (jsontext[i] == '[')
 return ReadArray(jsontext, ref i);
 else if (jsontext[i] == 'n')
 return ReadNull(jsontext, ref i);
 else if (char.IsLetter(jsontext[i]))
 return ReadBool(jsontext, ref i);
 else if (char.IsDigit(jsontext[i]) || jsontext[i] == '-')
 return ReadNumber(jsontext, ref i);
 else
 throw new Exception("JToken.Parse: unknown object type");
 }

 internal static object ReadNull(string jsontext, ref int i)
 {
 if (jsontext.Length &gt;= i + 4)
 {
 if (jsontext.Substring(i,4) == "null")
 {
 i += 4;
 return null;
 }
 }
 throw new Exception("JToken.ReadNull: unknown object type");
 }

 internal static string ReadStr(string jsontext, ref int i)
 {
 var j = LookForUnescaped(jsontext, i + 1, '"');
 var str = jsontext.Substring(i + 1, j - i - 1);
 i = j + 1;
 return str;
 }

 internal static JObject ReadJObject(string jsontext, ref int i)
 {
 var jobj = new JObject(jsontext, ref i);
 return jobj;
 }

 internal static JArray ReadArray(string jsontext, ref int i)
 {
 var jarr = new JArray(jsontext, ref i);
 return jarr;
 }

 internal static bool ReadBool(string jsontext, ref int i)
 {
 if (jsontext.Substring(i, 4) == "true")
 {
 i += 4;
 return true;
 }
 else if (jsontext.Substring(i, 5) == "false")
 {
 i += 5;
 return false;
 }
 else
 throw new Exception("JToken.ReadBool: Unknown value");
 }

 internal static double ReadNumber(string jsontext, ref int i)
 {
 var digitsstr = "";
 if (jsontext[i] == '-')
 {
 digitsstr += jsontext[i++];
 }
 while (char.IsNumber(jsontext[i]) || jsontext[i] == '.')
 {
 digitsstr += jsontext[i];
 i++;
 }

 return Convert.ToDouble(digitsstr, System.Globalization.NumberFormatInfo.InvariantInfo);
 }
 }

 public class JObject
 {
 private SortedDictionary &lt;string, object&gt; contents = new SortedDictionary&lt;string, object&gt;();

 private JObject()
 {
 }

 internal void Parse(string jsontext, ref int i)
 {
 i = JToken.LookFor(jsontext, i, '{', true);
 i++;

 // Loop through members
 do
 {
 // Look for label start (")
 i = JToken.LookFor(jsontext, i, '"', true);

 // Look for label end (")
 var j = JToken.LookFor(jsontext, i + 1, '"');

 // Assign label
 var label = jsontext.Substring(i + 1, j - i - 1);

 // Look for colon
 i = JToken.LookFor(jsontext, j + 1, ':', true);

 // Move to next character and skip whitespace
 i = JToken.SkipWhitespace(jsontext, i + 1);

 // Read token
 var token = JToken.Parse(jsontext, ref i);

 contents.Add(label, token);

 // Skip whitespace
 i = JToken.SkipWhitespace(jsontext, i);

 // Check if end of object or another member
 var ch = jsontext[i++];
 if (ch == '}')
 break;
 else if (ch == ',')
 continue;
 else
 throw new Exception("JObject.Parse: expected } or ,");
 }
 while (true);
 }

 public JObject(string jsontext)
 {
 var i = 0;
 Parse(jsontext, ref i);
 }

 internal JObject(string jsontext, ref int i)
 {
 Parse(jsontext, ref i);
 }

 public object this[string name]
 {
 get
 {
 return contents[name];
 }
 }
 }
 </File>
 </Files>
 </ScriptComponentProject>
 </ScriptComponentProject>
 </ScriptComponentSource>
 <OleDbDestination Name="Database" ConnectionName="JohanBimlDemoConnection">
 <ExternalTableOutput Table="[dbo].[Twitter]"></ExternalTableOutput>
 </OleDbDestination>
 </Transformations>
 </Dataflow>
 </Tasks>
 </Package>
 </Packages>
</Biml>
 

Resources:

TwitterScriptTaskDemo.biml
Total article views: 2377 | Views in the last 30 days: 4
 
Related Articles
ARTICLE

Get a Return Every Time.

This article will show how you can produce a return regardless of variables passed and debugging too...

ARTICLE

TSQL Challenge 55 - Multiply two very long decimal strings and return

This challenge to multiply two positive integer strings and return their product. The strings can be...

FORUM

Package Validation Error - Type of variable @[User::variable] is not a string

Package Validation Error - Type of variable @[User::variable] is not a string

FORUM

Using a string as a table variable

Using a string as a table variable

ARTICLE

Pulling real-time data from Twitter using StreamInsight

An example of building a StreamInsight input adapter for retrieving Twitter data in real-time.

Tags
 
Contribute