T-SQL insert using SSIS Data Pump


This article implements an automatization of SSIS Data Pump method described at http://www.sqlservercentral.com/articles/integration+services+(ssis)/72493/. The method SSIS Data Pump represents the most effective way to load large amount of data into a SQL Server database because the SSIS Data Pump makes optimal usage of the transaction logs, parallelism, locking, and pipelining. A more detailed explanation is in the referenced article.

Imagine the following scenario: I have a large filegroup containing 20 tables , and the filegroup has a lot of unused space. If I want to recover the unused space the best way is to create a new filegroup, rename the old tables , create the new tables on the new filegroup and then copy the rows from old tables into new tables . To copy 20 tables, I could manually create 20 SSIS packages and run them, but it would be nice to have a T-SQL stored procedure that, given a T-SQL select statement, executes it and loads the resulting rows into a given table using the SSIS Data Pump.

This article presents an implemented and tested solution that loads data using a dynamically generated SSIS package. The solution is made up of 2 components :

  1. An SSIS package named BULK_TRANSFER_DATA.dtsx
  2. the stored procedure Direct_Path_Insert

Here is a snapshot of how BULK_TRANSFER_DATA  looks like

The core of the solution is the VB script Create_DTSX, which creates an SSIS package using values contained into package variables that work as input parameters. This code is similar to that shown these two links:

The code is shown below:

 Public Sub Main()
        Dim packageName As String = Dts.Variables("DEST_TABLE_NAME").Value.ToString

        Dim threads As Integer = 1
        orderNumber = 1
        app = New Application()
        Dts.Events.FireInformation(0, "Bulk Transfer Package", packageName, "", 0, True)
        Dim srcDbConnectionString As String = Dts.Variables("SOURCE_CONNECT").Value.ToString
        Dim destDbConnectionString As String = Dts.Variables("DEST_CONNECT").Value.ToString
        'Create package
        dataTransferPackage = New Package()
        dataTransferPackage.LocaleID = 1029
        'Create SourceConection
        Dim srcConn As Microsoft.SqlServer.Dts.Runtime.ConnectionManager = dataTransferPackage.Connections.Add("OLEDB")
        srcConn.Name = "OLEDB Source"
        srcConn.ConnectionString = srcDbConnectionString
        'Create DestConection
        Dim destConn As Microsoft.SqlServer.Dts.Runtime.ConnectionManager = dataTransferPackage.Connections.Add("OLEDB")
        destConn.Name = "OLEDB Destination"
        destConn.ConnectionString = destDbConnectionString

        Dim SQLCommand As String = Dts.Variables("SOURCE_SELECT").Value.ToString
        Dim schemaTableName As String = Dts.Variables("DEST_TABLE_NAME").Value.ToString
        'Create DataFlow task
        Dim ePipeline As Executable = dataTransferPackage.Executables.Add("STOCK:PipelineTask")
        Dim thMainPipe As Microsoft.SqlServer.Dts.Runtime.TaskHost = CType(ePipeline, Microsoft.SqlServer.Dts.Runtime.TaskHost)
        thMainPipe.Name = Format(orderNumber, "000000") + "_copy_Table_Task_" + schemaTableName
        orderNumber += 1
        Dim dataFlowTask As MainPipe = CType(thMainPipe.InnerObject, MainPipe)
        dataFlowTask.DefaultBufferSize = 50 * 1024 * 1024
' FastLoadMaxInsertCommitSize will have the same value of DefaultBufferMaxRows 
' It represents the size in rows of a commited batch
' If I want to have all in a single transaction, must choose          
        dataFlowTask.DefaultBufferMaxRows = 20000
        ' dataFlowTask.BLOBTempStoragePath = dataTransferCache
        ' dataFlowTask.BufferTempStoragePath = dataTransferCache
        Dts.Events.FireInformation(0, "DataFlow task created for table", schemaTableName, "", 0, True)
        'Create OLEDB Source
        Dts.Events.FireInformation(0, "Creating OLEDB source - for table", schemaTableName, "", 0, True)
        Dim compSrc As IDTSComponentMetaData100 = dataFlowTask.ComponentMetaDataCollection.New()
        compSrc.ComponentClassID = "DTSAdapter.OleDbSource.2"
        compSrc.Name = "OLEDBSource_" + schemaTableName

        ' Initialize the component.
        Dim instSrc As CManagedComponentWrapper = compSrc.Instantiate()
        'Dts.Events.FireInformation(0, "Set Connection OLEDB source - for table", "[" + schemaName + "].[" + tableName + "]", "", 0, True)
        compSrc.RuntimeConnectionCollection(0).ConnectionManagerID = dataTransferPackage.Connections("OLEDB Source").ID
        compSrc.RuntimeConnectionCollection(0).ConnectionManager = DtsConvert.GetExtendedInterface(dataTransferPackage.Connections("OLEDB Source"))
        compSrc.CustomPropertyCollection.Item("DefaultCodePage").Value = 1250
        Dts.Events.FireInformation(0, "OLEDB source query", SQLCommand, "", 0, True)
        instSrc.SetComponentProperty("AccessMode", 2) ' Table or view ... 0, SQL Command ... 2, Table or view - fast 3
        instSrc.SetComponentProperty("SqlCommand", SQLCommand)
        ' Reinitialize the metadata.
        instSrc = Nothing
        compSrc.Name = "OLEDBSource_" + schemaTableName
        Dim srcOutput As IDTSOutput100 = compSrc.OutputCollection(0)
        Dts.Events.FireInformation(0, "SSIS Tasks created for table", schemaTableName, "", 0, True)
        'Create OLEDB Destination
        Dts.Events.FireInformation(0, "Creating OLEDB dest - for table", schemaTableName, "", 0, True)
        Dim compDest As IDTSComponentMetaData100 = dataFlowTask.ComponentMetaDataCollection.New()
        compDest.ComponentClassID = "DTSAdapter.OleDbDestination.2"
        compDest.Name = "OLEDBDest_" + schemaTableName
        ' Initialize the component.
        Dim instDest As CManagedComponentWrapper = compDest.Instantiate()
        compDest.RuntimeConnectionCollection(0).ConnectionManagerID = dataTransferPackage.Connections("OLEDB Destination").ID
        compDest.RuntimeConnectionCollection(0).ConnectionManager = DtsConvert.GetExtendedInterface(dataTransferPackage.Connections("OLEDB Destination"))
        compDest.CustomPropertyCollection.Item("DefaultCodePage").Value = 1250
        ' Set the custom properties
        instDest.SetComponentProperty("AccessMode", 3)
        instDest.SetComponentProperty("OpenRowset", "DBO." + schemaTableName)
        Dts.Events.FireInformation(0, "instDest", schemaTableName, "", 0, True)
        instDest.SetComponentProperty("FastLoadKeepIdentity", True)
        instDest.SetComponentProperty("FastLoadKeepNulls", True)
        instDest.SetComponentProperty("FastLoadMaxInsertCommitSize", dataFlowTask.DefaultBufferMaxRows)
        instDest.SetComponentProperty("FastLoadOptions", "CHECK_CONSTRAINTS ")
        ',ROWS_PER_BATCH = " + dataFlowTask.DefaultBufferMaxRows.ToString())
        Dim destInput As IDTSInput100 = compDest.InputCollection(0)
        Dim path As IDTSPath100 = dataFlowTask.PathCollection.New()
        path.AttachPathAndPropagateNotifications(srcOutput, destInput)
        ' Reinitialize the metadata.
        compDest.Name = "OLEDBDest_" + schemaTableName
        Dts.Events.FireInformation(0, "instDest_ioriuno", schemaTableName, "", 0, True)
        Dim vdestInput As IDTSVirtualInput100 = destInput.GetVirtualInput()
        For Each vColumn As IDTSVirtualInputColumn100 In vdestInput.VirtualInputColumnCollection
            Dim vCol As IDTSInputColumn100 = instDest.SetUsageType(destInput.ID, vdestInput, vColumn.LineageID, DTSUsageType.UT_READWRITE)
            instDest.MapInputColumn(destInput.ID, vCol.ID, destInput.ExternalMetadataColumnCollection(vColumn.Name).ID)
        instDest = Nothing
        dataTransferPackage.MaxConcurrentExecutables = threads
        dataTransferPackage.MaximumErrorCount = 100
        app.SaveToXml(packageName + ".dtsx", dataTransferPackage, Nothing)
        Dts.TaskResult = ScriptResults.Success
    End Sub

The BULK_TRANSFER_DATA first creates an SSIS package named the same as the destination table. It then executes the package using a built-in SSIS ExecutePackage component before and deleting the package. The dynamically created package contains a DataFlow that contains a simple OleDb Source - OleDb Destination mapping.

The OleDb Source is created from the value of the package variable SOURCE_SELECT, which is the concatenation of the variables SELECT_CLAUSE + WHERE_CONDITION. The OleDB Destination inserts data into a table named the same as the value of the variable DEST_TABLE_NAME.

Let's now look at the stored procedure. The procedure first creates a Windows batch file named the same as the destination_table and then launches it. The batch file sets the working directory as the value of @work_dir, and then launches BULK_TRANSFER_DATA.

The prerequistes for Direct_Path_Insert to run are :

  1. The destination table exists
  2. The columns generated by the source select must have the identical name (case sensitive), datatype, and length as the columns of the destination table.

The procedure code is shown here:

Created by F Iori  20110427
Perform Bulk Insert, generating dynamically a dtsx , running it , deleting it
Setup : copy into @work_dir on the server BULK_TRANSFER_DATA.dtsx and thats all
@dest_tab must be without schema name , is dbo by default
@select_clause column names are case sensitive , they should be the same name, type and number as dest_tab
elseway the proc will fail
 @dest_tab varchar(200) = 'drop_GSM_CDR_PTC_TRAFFIC_ACTUAL' -- Schema is always dbo
, @src_db varchar(200) = 'DWHSK_TARGET_PROD'
, @dest_db varchar(200) = 'DWHSK_STAGE_PROD'
, @instance_name varchar(50) = 'NTSQLDWHSKT01\I03'
-- select clause column names are case sensitive
,@select_clause varchar(2000) =
'select top 350 CDR_ID, File_ID, Exchange_ID , Charging_Start_Time , Create_Date  from GSM_CDR_PTC_TRAFFIC_ACTUAL where 1=1 '
, @wherecond varchar(2000) = ' and Charging_Start_Time between ''2011-03-01'' and ''2011-03-02'' ' -- or empty string
exec dwhsk_warehouse.dbo.Direct_Path_Insert @dest_tab  , @src_db , @dest_db ,
 @instance_name , @select_clause ,  @wherecond  
CREATE proc [dbo].[Direct_Path_Insert] (
 @dest_tab varchar(200) , @src_db varchar(50), @dest_db varchar(50),
 @instance_name varchar(50), @select_clause varchar(2000) ,
 @wherecond varchar(2000)  ) as
 @ret int , @cmd varchar(8000) = '' , @cmds varchar(8000) = ''
, @work_dir varchar(100)= 'N:\Data1\SS_Direct_Path_Insert\'
  set @cmd =
     ' DTexec.exe /File  "BULK_TRANSFER_DATA.dtsx"  '
   + ' /SET "\package.Variables[User::DEST_TABLE_NAME].Value";"'+@dest_tab+'" '
   + ' /SET "\package.Variables[User::SOURCE_DB].Value";"'+@src_db+'" '
   + ' /SET "\package.Variables[User::DEST_DB].Value";"'+@dest_db+'" '
   + ' /SET "\package.Variables[User::INSTANCE_NAME].Value";"'+@instance_name+'" '
   + ' /SET "\package.Variables[User::SELECT_CLAUSE].Value";"'+@select_clause+'" '
   + ' /SET "\package.Variables[User::WHERE_CONDITION].Value";"'+@wherecond+'" '
 print @cmd
 set @cmds=    ' del '+@work_dir+@dest_tab+'.* '
    exec  @ret= master.sys.xp_cmdshell @cmds , no_output
  set @cmds= 'echo '+LEFT(@work_dir,1)+': > '+@work_dir+@dest_tab+'.bat '
  exec  @ret= master.sys.xp_cmdshell @cmds , no_output
  set @cmds= 'echo cd '+@work_dir+' >> '+@work_dir+@dest_tab+'.bat '
  exec  @ret= master.sys.xp_cmdshell @cmds , no_output
 set @cmds=' echo '+@cmd+' >> '+@work_dir+@dest_tab+'.bat '
  exec  @ret= master.sys.xp_cmdshell @cmds , no_output
set @cmds = ' '+@work_dir+@dest_tab+'.bat '
   exec  @ret= master.sys.xp_cmdshell @cmds ,  no_output
  if @ret<>0  
     declare @errmsg varchar(300) = @dest_tab+'.bat failed '
     raiserror (@errmsg, 12, 1)
 set @cmds=    ' del '+@work_dir+@dest_tab+'.bat '
    exec  @ret= master.sys.xp_cmdshell @cmds , no_output

Installation of this system is very easy :

  1. On the database server create a folder named the same as the variable @work_dir defined in the stored procedure Direct_Path_Insert.
  2. Copy into the folder @work_dir, the SSIS package BULK_TRANSFER_DATA.dtsx
  3. Create the stored procedure Direct_Path_Insert on any db of the instance.

Now the stored procedure can be called from a T-SQL script as :

exec Direct_Path_Insert <destination_table> , <source_db_name>, <destination_db_name>, @@servername, <source_select >, <where_condition for source select>

Example :

-- Create destination table 
select top 250 [CDR_ID]
      , Charging_Start_Time
      , Create_Date
into dwhsk_stage_prod.dbo.drop_GSM_CDR_PTC_TRAFFIC_ACTUAL
from dwhsk_target_prod..GSM_CDR_PTC_TRAFFIC_ACTUAL where 1=2
-- Loads some rows into destination table 
 @dest_tab varchar(200) = 'drop_GSM_CDR_PTC_TRAFFIC_ACTUAL' -- Schema is always dbo
, @src_db varchar(200) = 'DWHSK_TARGET_PROD'
, @dest_db varchar(200) = 'DWHSK_STAGE_PROD'
, @instance_name varchar(50) = 'NTSQLDWHSKT01\I03'
,@select_clause varchar(2000) =
 ' select CDR_ID, File_ID, Exchange_ID , Charging_Start_Time , Create_Date  from GSM_CDR_PTC_TRAFFIC_ACTUAL where 1=1 '
, @wherecond varchar(2000) = ' and Charging_Start_Time between ''2011-03-01'' and ''2011-03-02'' ' -- or empty string
exec dwhsk_warehouse.dbo.Direct_Path_Insert @dest_tab  , @src_db , @dest_db ,
 @instance_name , @select_clause ,  @wherecond
select * from dwhsk_stage_prod.dbo.drop_GSM_CDR_PTC_TRAFFIC_ACTUAL
drop table dwhsk_stage_prod.dbo.drop_GSM_CDR_PTC_TRAFFIC_ACTUAL

At the end, the dynamically generated SSIS packages and Windows batch files are deleted from the working directory on the database server.

Please note that the insert operation performed by Direct_Path_Insert is not atomic, but commits inserted rows in batchs of 20000 rows. That is the value of dataFlowTask.DefaultBufferMaxRows in the VB script. If an error occurs,  only the last batch is rolled back.

To have atomic behaviour, just set FastLoadMaxInsertCommitSize to 0 and all the inserted rows will be committed or rolled back in a single batch.

The name of the stored procedure Direct_Path_Insert has been chosen because offers similar functionality as Oracle Direct Path Insert ( insert with hint APPEND).



3.88 (8)




3.88 (8)