SQLServerCentral Article

A Common Architecture for Loading Data

,

It is always a welcome challenge in a big organization to write ETL application, that runs quite efficiently. As we all know during the nightly batch process the time is a constraint and the allocated time must be wisely used. Recently I was given the task of designing a service that loads data into a sql server database. The extracts come in pipe separated text files. There are eighteen extracts; The challenge was to create one loader which can handle all eighteen extracts at the same time and must handle any totally new feeds without any new major development. The design I came up with is very flexible that any structural changes to source files was accommodated very easily without changing the frame work code. The initial requirement was to load only eight extracts, but the project ended up having eighteen extracts. I accepted the new extracts without changing the framework code.

It is important to pay attention to performance and scalability. My goal was to create an ETL application that can load multiple files with good performance and should be able to add new extracts without modifying the existing framework code. So the design was a no brainier, so I chose "Plug in" Architecture as the solution. This methodology provides increased flexibility for any future enhancement.

The challenge was to create one loader which can handle all eighteen extracts at the same time and must handle any totally new feeds without any new major development. The design I came up with is very flexible that any structural changes to source files was accommodated very easily without changing the frame work code. The initial requirement was to load only eight extracts, but the project ended up having eighteen extracts. I accepted the new extracts without changing the framework code.

Framework functionalities

  • Finding Extract files
  • Verifying Extract files integrity
  • Loading into staging tables
  • Creating dynamic format files
  • Running component (Eg Multithreading)
  • Integrating component results in the database
  • Unloading component

The advantage of the "Plug In" approach is that it simplifies the future addition using only metadata, this table contains unique names of extracts, names of stored procedures for uploading data from staging to production tables, unique key position in the data file and so on.

Here is the flow diagram

It is always interesting to create a high performance application, so I decided to go with a combination of using BCP (Bulk Copy Program) asynchronously and uploading data from staging tables into production tables with the use of many threads. BCP is a part of the SQL Server 2000 client. It is a very powerful tool to upload data.

I used System.Diagnostics namespace to run BCP asynchronously to upload data into the staging tables. This means that while application is loading data into a staging table, another process reads the source folder and prepares a data file to be uploaded.

It is easier to write synchronous code solutions. However, in using asynchronous multithreaded programming, one of the biggest gains is to avoid bottlenecks. We all know that synchronous programming can cause unnecessary dependencies. There are many more reasons for to choose asynchronous multithreaded programming over synchronous programming. More information can be found at http://msdn.microsoft.com/msdnmag/issues/05/08/concurrency/default.aspx.

So I created two C# components

  1. An executable which acts as Loader framework
  2. A common class(objLibrary) which contains all common activities such as error logging, encryption decryption and connection to DBs.

The database is SQL Server 2000.

Here are the functionalities of the loader framework

  1. Read source folder for data files to be loaded. Multiple source files can be loaded at the asynchronously using System.Diagnostics.Process based on the flag value in the executable configuration file.

Example: Here are the variables from the config file; application uses these variables to decide the course of action.

<add key="NumberOfFilesAtaTime" value="18"/>

Please include System.Diagnostics in the C# code. The following code is executed within a loop using Directory.GetFiles and then processing all the files in the specified directory.

     
System.Diagnostics.Process p = null;
//Process multiple files or process one file at a time
if(System.Configuration.ConfigurationSettings.AppSettings["NumberOfFilesAtaTime"]=="1")
  {  p.WaitForExit();
}
   
//This executed after execution of the 
loop, ensure the all extracts are 
loaded
if(System.Configuration.ConfigurationSettings.AppSettings["NumberOfFilesAtaTime"]!="1")
  {
   p.WaitForExit();
  }
  1. Read the static table. This static table contains the names of the tables, primary key information, and names of the stored procedures used for uploading data from staging tables into production tables. Here is a sample DDL for static table.
CREATE TABLE [dbo].[LoaderDetails](
            [L_LoaderName] [varchar](50)  NOT NULL,
            [L_StoredProcedureName] [varchar](80)  NULL,
            [L_LoaderKeyColumnNumber] [int] NULL,
            [L_KeyIDColName] [varchar](50)  NULL,
            [L_LoaderDescription] [varchar](50)  NULL,
            [L_LastUpdatedTime] [datetime] NULL,
            [L_LastStatusMessage] [varchar](500)  NULL,
            [L_BPSKeyLoader] [bit] NULL
) ON [PRIMARY]
  1. In this application format files are used very heavily through BCP to upload data. I use format files to selectively pick columns from source file to be exported into sql server table. For example if source files contain twenty columns and I need data from columns 3,7,15 and 16 then format files are extremely useful to complete this task. The format files are created on a nightly basis, and the primary reason for creating format files regularly is to accommodate any table structure changes. I use the schema of the staging tables to do this.

    Create format files using the schema of staging tables. The format files are created every night before the batch begins to ensure all changes to tables are included during data load. These table names are stored in the static table. For example if the staging table ABC has six columns then a format file with six fields is created dynamically using SqlDataAdapter,StreamWriter classes.

    8.0
    7
    1 SQLCHAR 0 0 "\n" 0 "" ""
    2 SQLCHAR 0 0 "|" 1 a0 Finnish_Swedish_CS_AS
    3 SQLCHAR 0 0 "|" 2 a1 Finnish_Swedish_CS_AS
    4 SQLCHAR 0 0 "|" 3 a2 Finnish_Swedish_CS_AS
    5 SQLCHAR 0 0 "|" 4 a3 Finnish_Swedish_CS_AS
    6 SQLCHAR 0 0 "|" 5 a4 Finnish_Swedish_CS_AS
    7 SQLCHAR 0 0 "\r" 6 a6 Finnish_Swedish_CS_AS
  1. Use Bulk Copy Program (BCP) with the created format file to upload data from data files into a staging table. BCP also produces an output file and an error file.

I added the following node in my config file to find the location of BCP.

<add key="BCPexePath" value="C:\Program Files\Microsoft SQL Server\80\Tools\Binn\BCP.EXE"/>
<add key="BCPSwithches" value=" -n -t | -c  -b 50000"/>

Here is the code that uses to execute BCP to load data.

//The file that will be use to capture errors thrown by BCP
strOutPutFileName=ConfigurationSettings.AppSettings["FormatFilePath"]+"RecordsProcessed"
       +ConfigurationSettings.AppSettings["ErrorFileNamePrefix"] + strLoaderName+".txt";
strBCPErrorFileName=ConfigurationSettings.AppSettings["FormatFilePath"]
       +ConfigurationSettings.AppSettings["ErrorFileNamePrefix"] + strLoaderName + ".txt";
proc.Arguments=strLoaderName + @"  IN " + strReceivedFileName.ToString() + ConfigurationSettings.AppSettings["BCPSwithches"].Replace("|","\"|\" ")+ objLibrary.Decrypt(objLibrary.Read("CONNECTION_BCP"))  
   + " -o" + strOutPutFileName + " -e " + strBCPErrorFileName + " -L " + dblRecordCount+1 + " -f" 
+ proc.UseShellExecute=false;      
proc.RedirectStandardOutput=false;
proc.ErrorDialog=false;
//The file that will be use to capture errors thrown by BCP
strOutPutFileName=ConfigurationSettings.AppSettings["FormatFilePath"]+"RecordsProcessed"
        +ConfigurationSettings.AppSettings["ErrorFileNamePrefix"] + strLoaderName+".txt";
strBCPErrorFileName=ConfigurationSettings.AppSettings["FormatFilePath"]
        +ConfigurationSettings.AppSettings["ErrorFileNamePrefix"] + strLoaderName + ".txt";
proc.Arguments=strLoaderName + @"  IN " + strReceivedFileName.ToString() 
       + ConfigurationSettings.AppSettings["BCPSwithches"].Replace("|","\"|\" ")
 + objLibrary.Decrypt(objLibrary.Read("CONNECTION_BCP"))  
       + " -o" + strOutPutFileName + " -e " + strBCPErrorFileName + " -L " + dblRecordCount+1 
 + " -f" + proc.UseShellExecute=false;      
proc.RedirectStandardOutput=false;
proc.ErrorDialog=false;
//Begin process asyncronously
p = System.Diagnostics.Process.Start(proc);
  1. Another set of format files are created to upload all failed records into an exception management table. The static table contains the primary column number for all the staging tables, which is used to create these format files. For example if the primary key is the 6th column then the following format file is created. The execution is same as above except this time all failed records are loaded into exception table.

5.6.

8.0
9
1 SQLCHAR 0 0 "\n" 0 "" ""
2 SQLCHAR 0 0 "\t" 0 a0 Finnish_Swedish_CS_AS
3 SQLCHAR 0 0 "\t" 0 a1 Finnish_Swedish_CS_AS
4 SQLCHAR 0 0 "\t" 0 a2 Finnish_Swedish_CS_AS
5 SQLCHAR 0 0 "\t" 0 a3 Finnish_Swedish_CS_AS
6 SQLCHAR 0 0 "\t" 0 a4 Finnish_Swedish_CS_AS
7 SQLCHAR 0 0 "\t" 0 a5 Finnish_Swedish_CS_AS
8 SQLCHAR 0 0 "\t" 1 a6 Finnish_Swedish_CS_AS
9 SQLCHAR 0 0 "\r" 0 a8 Finnish_Swedish_CS_AS
  1. Use Bulk Copy Program (BCP) with the created format file to upload failed data from

    error log files into exception management table

//load error records
System.Diagnostics.ProcessStartInfo proc = new System.Diagnostics.ProcessStartInfo(strBCPPath);
proc.Arguments="FAILEDRECORDS  " + @"  IN " + ConfigurationSettings.AppSettings["FormatFilePath"]
              + "Log"+oTable.Rows[j].ItemArray.GetValue(0).ToString()+"_Load.txt" 
  + ConfigurationSettings.AppSettings["BCPErrorLoadSwithches"]+ @" ""\t"" " 
  + objLibrary.Decrypt(objLibrary.Read("CONNECTION_BCP")) 
//+ " -o" + strOutPutFileName + " -e " + strBCPErrorFileName + " -L " + dblRecordCount+1 +
// " -f" + Get_Create_Format_File(SQLCon,strLoaderName); 
              + " -f" + strFormatFilename;
  1. After loading all data files into the staging and exception management tables (All failed records), a Normalizing process cleans up the staging tables using the exception management table. My requirement is that any failed record must be taken out from all extracts before loading them into production table. The reason is to ensure that no partial records are loaded. So another process reads FailedRecords and then delete records from all staging tables containing these values. This is another way for normalizing before uploading into production tables.
  2. Finally, we read the static table and execute stored procedures to load data from staging table into production table System.Threading used for multithreading
//Execute_Stored_Procedure is designed to handle multithreaded stored procedure executions
public void get_Stored_Procedure_Names()
{ 
 //Read Static Table
 SqlDataAdapter dataadapter = new SqlDataAdapter
      ("SELECT L_StoredProcedureName FROM [LoaderDetails] where [L_PensionerLoader]=1", SQLCon);
 DataTable dataTable = new DataTable();
 dataadapter.Fill(dataTable);
 SQLCon.Close();
 foreach(DataRow row in dataTable.Rows)
  {   
   Thread thread=null;
   foreach ( DataColumn col in dataTable.Columns ) 
    {
     lock(this)
       {
        strStoreProcedureName= row [ col ].ToString();
        if (ConfigurationSettings.AppSettings["ExeStoProcMultiThreading"]!="Yes")
         {
           Execute_Stored_Procedure();
         }
        else
         {
           thread = new Thread(new ThreadStart(Execute_Stored_Procedure));
           //Console.WriteLine ( row [ col ].ToString ( ) );
           thread.Name = row [ col ].ToString ( );
           thread.Start();
          }
        }
    }
    if (ConfigurationSettings.AppSettings["ExeStoProcMultiThreading"]=="Yes")
     {
       thread.Join();
     }
  }
}
private void Execute_Stored_Procedure()
{
  try
   {
     cmdPublic=new SqlCommand();    
     cmdPublic.CommandTimeout=0;
     cmdPublic.CommandText=strStoreProcedureName;      
     cmdPublic.Connection=SQLCon2;            
     if (strStoreProcedureName!="")
      {
       if (ConfigurationSettings.AppSettings["ExeStoProcMultiThreading"]!="Yes")
        {
          cmdPublic.CommandTimeout=7000;         
          cmdPublic.ExecuteNonQuery();
        }
       else
        {
          //cmdPublic.ExecuteNonQuery();
          Thread thread = new Thread(new ThreadStart(ExecuteNonQuery));
          thread.Start();
        }
      }
    }
   catch(Exception ex)
    {
      objLibrary.EventID=10005;
      objLibrary.ErrType=System.Diagnostics.EventLogEntryType.Error;
      objLibrary.ErrorRoutine(" Failed Stored Procedure: " + strStoreProcedureName + " Message :" 
    + ex.Message,"Source"); 
    }
}
public void ExecuteNonQuery()
{
  try
   {
     cmdPublic.ExecuteNonQuery();
   }
  catch(Exception ex)
   {
     objLibrary.ErrorRoutine(true,ex);
   }
}

Conclusions

My objective was to create an application that is very flexible, good performance and scalable. Although the above design sounds complicated, it accomplished all required goals. Now the daily feeds process all eighteen files at the same time and loads them into production tables very efficiently

Rate

4.5 (4)

You rated this post out of 5. Change rating

Share

Share

Rate

4.5 (4)

You rated this post out of 5. Change rating