Click here to monitor SSC
SQLServerCentral is supported by Red Gate Software Ltd.
 
Log in  ::  Register  ::  Not logged in
 
 
 

T-SQL insert using SSIS Data Pump

By Federico Iori,

This article implements an automatization of SSIS Data Pump method described at http://www.sqlservercentral.com/articles/integration+services+(ssis)/72493/. The method SSIS Data Pump represents the most effective way to load large amount of data into a SQL Server database because the SSIS Data Pump makes optimal usage of the transaction logs, parallelism, locking, and pipelining. A more detailed explanation is in the referenced article.

Imagine the following scenario: I have a large filegroup containing 20 tables , and the filegroup has a lot of unused space. If I want to recover the unused space the best way is to create a new filegroup, rename the old tables , create the new tables on the new filegroup and then copy the rows from old tables into new tables . To copy 20 tables, I could manually create 20 SSIS packages and run them, but it would be nice to have a T-SQL stored procedure that, given a T-SQL select statement, executes it and loads the resulting rows into a given table using the SSIS Data Pump.

This article presents an implemented and tested solution that loads data using a dynamically generated SSIS package. The solution is made up of 2 components :

  1. An SSIS package named BULK_TRANSFER_DATA.dtsx
  2. the stored procedure Direct_Path_Insert

Here is a snapshot of how BULK_TRANSFER_DATA  looks like

The core of the solution is the VB script Create_DTSX, which creates an SSIS package using values contained into package variables that work as input parameters. This code is similar to that shown these two links:

The code is shown below:

 Public Sub Main()

        Dim packageName As String = Dts.Variables("DEST_TABLE_NAME").Value.ToString


        Dim threads As Integer = 1

        orderNumber = 1

        '--------------------------------------------------------------------------------------------
        app = New Application()

        Dts.Events.FireInformation(0, "Bulk Transfer Package", packageName, "", 0, True)

        Dim srcDbConnectionString As String = Dts.Variables("SOURCE_CONNECT").Value.ToString
        Dim destDbConnectionString As String = Dts.Variables("DEST_CONNECT").Value.ToString
        '--------------------------------------------------------------------------------------------
        'Create package
        dataTransferPackage = New Package()
        dataTransferPackage.LocaleID = 1029

        'Create SourceConection
        Dim srcConn As Microsoft.SqlServer.Dts.Runtime.ConnectionManager = dataTransferPackage.Connections.Add("OLEDB")
        srcConn.Name = "OLEDB Source"
        srcConn.ConnectionString = srcDbConnectionString

        'Create DestConection
        Dim destConn As Microsoft.SqlServer.Dts.Runtime.ConnectionManager = dataTransferPackage.Connections.Add("OLEDB")
        destConn.Name = "OLEDB Destination"
        destConn.ConnectionString = destDbConnectionString


        Dim SQLCommand As String = Dts.Variables("SOURCE_SELECT").Value.ToString
        Dim schemaTableName As String = Dts.Variables("DEST_TABLE_NAME").Value.ToString

        '------------------------------------------------------
        'Create DataFlow task
        Dim ePipeline As Executable = dataTransferPackage.Executables.Add("STOCK:PipelineTask")
        Dim thMainPipe As Microsoft.SqlServer.Dts.Runtime.TaskHost = CType(ePipeline, Microsoft.SqlServer.Dts.Runtime.TaskHost)
        thMainPipe.Name = Format(orderNumber, "000000") + "_copy_Table_Task_" + schemaTableName
        orderNumber += 1
        Dim dataFlowTask As MainPipe = CType(thMainPipe.InnerObject, MainPipe)

        dataFlowTask.DefaultBufferSize = 50 * 1024 * 1024
' FastLoadMaxInsertCommitSize will have the same value of DefaultBufferMaxRows 
' It represents the size in rows of a commited batch
' If I want to have all in a single transaction, must choose          
        dataFlowTask.DefaultBufferMaxRows = 20000
        
        ' dataFlowTask.BLOBTempStoragePath = dataTransferCache
        ' dataFlowTask.BufferTempStoragePath = dataTransferCache

        Dts.Events.FireInformation(0, "DataFlow task created for table", schemaTableName, "", 0, True)
        '--------------------------------------------------------------------------------------------
        'Create OLEDB Source
        Dts.Events.FireInformation(0, "Creating OLEDB source - for table", schemaTableName, "", 0, True)

        Dim compSrc As IDTSComponentMetaData100 = dataFlowTask.ComponentMetaDataCollection.New()
        compSrc.ComponentClassID = "DTSAdapter.OleDbSource.2"
        compSrc.Name = "OLEDBSource_" + schemaTableName


        ' Initialize the component.
        Dim instSrc As CManagedComponentWrapper = compSrc.Instantiate()
        instSrc.ProvideComponentProperties()

        'Dts.Events.FireInformation(0, "Set Connection OLEDB source - for table", "[" + schemaName + "].[" + tableName + "]", "", 0, True)
        compSrc.RuntimeConnectionCollection(0).ConnectionManagerID = dataTransferPackage.Connections("OLEDB Source").ID
        compSrc.RuntimeConnectionCollection(0).ConnectionManager = DtsConvert.GetExtendedInterface(dataTransferPackage.Connections("OLEDB Source"))
        compSrc.CustomPropertyCollection.Item("DefaultCodePage").Value = 1250

        Dts.Events.FireInformation(0, "OLEDB source query", SQLCommand, "", 0, True)

        instSrc.SetComponentProperty("AccessMode", 2) ' Table or view ... 0, SQL Command ... 2, Table or view - fast 3
        instSrc.SetComponentProperty("SqlCommand", SQLCommand)

        ' Reinitialize the metadata.
        instSrc.AcquireConnections(vbNull)
        instSrc.ReinitializeMetaData()
        instSrc.ReleaseConnections()
        instSrc = Nothing

        compSrc.Name = "OLEDBSource_" + schemaTableName

        Dim srcOutput As IDTSOutput100 = compSrc.OutputCollection(0)

        Dts.Events.FireInformation(0, "SSIS Tasks created for table", schemaTableName, "", 0, True)
        '--------------------------------------------------------------------------------------------
        'Create OLEDB Destination
        Dts.Events.FireInformation(0, "Creating OLEDB dest - for table", schemaTableName, "", 0, True)

        Dim compDest As IDTSComponentMetaData100 = dataFlowTask.ComponentMetaDataCollection.New()
        compDest.ComponentClassID = "DTSAdapter.OleDbDestination.2"
        compDest.Name = "OLEDBDest_" + schemaTableName

        ' Initialize the component.
        Dim instDest As CManagedComponentWrapper = compDest.Instantiate()
        instDest.ProvideComponentProperties()

        compDest.RuntimeConnectionCollection(0).ConnectionManagerID = dataTransferPackage.Connections("OLEDB Destination").ID
        compDest.RuntimeConnectionCollection(0).ConnectionManager = DtsConvert.GetExtendedInterface(dataTransferPackage.Connections("OLEDB Destination"))
        compDest.CustomPropertyCollection.Item("DefaultCodePage").Value = 1250

        ' Set the custom properties
        instDest.SetComponentProperty("AccessMode", 3)
        instDest.SetComponentProperty("OpenRowset", "DBO." + schemaTableName)

        Dts.Events.FireInformation(0, "instDest", schemaTableName, "", 0, True)

        instDest.SetComponentProperty("FastLoadKeepIdentity", True)
        instDest.SetComponentProperty("FastLoadKeepNulls", True)
        instDest.SetComponentProperty("FastLoadMaxInsertCommitSize", dataFlowTask.DefaultBufferMaxRows)
        instDest.SetComponentProperty("FastLoadOptions", "CHECK_CONSTRAINTS ")
        ',ROWS_PER_BATCH = " + dataFlowTask.DefaultBufferMaxRows.ToString())

        Dim destInput As IDTSInput100 = compDest.InputCollection(0)

        Dim path As IDTSPath100 = dataFlowTask.PathCollection.New()
        path.AttachPathAndPropagateNotifications(srcOutput, destInput)

        ' Reinitialize the metadata.
        instDest.AcquireConnections(vbNull)
        instDest.ReinitializeMetaData()
        compDest.Name = "OLEDBDest_" + schemaTableName

        Dts.Events.FireInformation(0, "instDest_ioriuno", schemaTableName, "", 0, True)

        Dim vdestInput As IDTSVirtualInput100 = destInput.GetVirtualInput()

        For Each vColumn As IDTSVirtualInputColumn100 In vdestInput.VirtualInputColumnCollection
            Dim vCol As IDTSInputColumn100 = instDest.SetUsageType(destInput.ID, vdestInput, vColumn.LineageID, DTSUsageType.UT_READWRITE)
            instDest.MapInputColumn(destInput.ID, vCol.ID, destInput.ExternalMetadataColumnCollection(vColumn.Name).ID)
        Next

        instDest.ReleaseConnections()
        instDest = Nothing

        '--------------------------------------------------------------------------------------------

        dataTransferPackage.MaxConcurrentExecutables = threads

        dataTransferPackage.MaximumErrorCount = 100

        app.SaveToXml(packageName + ".dtsx", dataTransferPackage, Nothing)

        Dts.TaskResult = ScriptResults.Success
    End Sub

The BULK_TRANSFER_DATA first creates an SSIS package named the same as the destination table. It then executes the package using a built-in SSIS ExecutePackage component before and deleting the package. The dynamically created package contains a DataFlow that contains a simple OleDb Source - OleDb Destination mapping.

The OleDb Source is created from the value of the package variable SOURCE_SELECT, which is the concatenation of the variables SELECT_CLAUSE + WHERE_CONDITION. The OleDB Destination inserts data into a table named the same as the value of the variable DEST_TABLE_NAME.

Let's now look at the stored procedure. The procedure first creates a Windows batch file named the same as the destination_table and then launches it. The batch file sets the working directory as the value of @work_dir, and then launches BULK_TRANSFER_DATA.

The prerequistes for Direct_Path_Insert to run are :

  1. The destination table exists
  2. The columns generated by the source select must have the identical name (case sensitive), datatype, and length as the columns of the destination table.

The procedure code is shown here:

/*
Created by F Iori  20110427
Perform Bulk Insert, generating dynamically a dtsx , running it , deleting it
Setup : copy into @work_dir on the server BULK_TRANSFER_DATA.dtsx and thats all
@dest_tab must be without schema name , is dbo by default
@select_clause column names are case sensitive , they should be the same name, type and number as dest_tab
elseway the proc will fail

*/

/*
declare
 @dest_tab varchar(200) = 'drop_GSM_CDR_PTC_TRAFFIC_ACTUAL' -- Schema is always dbo
, @src_db varchar(200) = 'DWHSK_TARGET_PROD'
, @dest_db varchar(200) = 'DWHSK_STAGE_PROD'
, @instance_name varchar(50) = 'NTSQLDWHSKT01\I03'
-- select clause column names are case sensitive
,@select_clause varchar(2000) =
'select top 350 CDR_ID, File_ID, Exchange_ID , Charging_Start_Time , Create_Date  from GSM_CDR_PTC_TRAFFIC_ACTUAL where 1=1 '
, @wherecond varchar(2000) = ' and Charging_Start_Time between ''2011-03-01'' and ''2011-03-02'' ' -- or empty string

exec dwhsk_warehouse.dbo.Direct_Path_Insert @dest_tab  , @src_db , @dest_db ,
 @instance_name , @select_clause ,  @wherecond  
*/
CREATE proc [dbo].[Direct_Path_Insert] (
 @dest_tab varchar(200) , @src_db varchar(50), @dest_db varchar(50),
 @instance_name varchar(50), @select_clause varchar(2000) ,
 @wherecond varchar(2000)  ) as
declare  
 @ret int , @cmd varchar(8000) = '' , @cmds varchar(8000) = ''
, @work_dir varchar(100)= 'N:\Data1\SS_Direct_Path_Insert\'
 begin
  set @cmd =
     ' DTexec.exe /File  "BULK_TRANSFER_DATA.dtsx"  '
   + ' /SET "\package.Variables[User::DEST_TABLE_NAME].Value";"'+@dest_tab+'" '
   + ' /SET "\package.Variables[User::SOURCE_DB].Value";"'+@src_db+'" '
   + ' /SET "\package.Variables[User::DEST_DB].Value";"'+@dest_db+'" '
   + ' /SET "\package.Variables[User::INSTANCE_NAME].Value";"'+@instance_name+'" '
   + ' /SET "\package.Variables[User::SELECT_CLAUSE].Value";"'+@select_clause+'" '
   + ' /SET "\package.Variables[User::WHERE_CONDITION].Value";"'+@wherecond+'" '
   
 print @cmd
 set @cmds=    ' del '+@work_dir+@dest_tab+'.* '
    exec  @ret= master.sys.xp_cmdshell @cmds , no_output
  set @cmds= 'echo '+LEFT(@work_dir,1)+': > '+@work_dir+@dest_tab+'.bat '
  exec  @ret= master.sys.xp_cmdshell @cmds , no_output
  set @cmds= 'echo cd '+@work_dir+' >> '+@work_dir+@dest_tab+'.bat '
  exec  @ret= master.sys.xp_cmdshell @cmds , no_output
 set @cmds=' echo '+@cmd+' >> '+@work_dir+@dest_tab+'.bat '
  exec  @ret= master.sys.xp_cmdshell @cmds , no_output
set @cmds = ' '+@work_dir+@dest_tab+'.bat '
   exec  @ret= master.sys.xp_cmdshell @cmds ,  no_output
  if @ret<>0  
    BEGIN
     declare @errmsg varchar(300) = @dest_tab+'.bat failed '
     raiserror (@errmsg, 12, 1)
      RETURN
    END
 set @cmds=    ' del '+@work_dir+@dest_tab+'.bat '
    exec  @ret= master.sys.xp_cmdshell @cmds , no_output
 end

Installation of this system is very easy :

  1. On the database server create a folder named the same as the variable @work_dir defined in the stored procedure Direct_Path_Insert.
  2. Copy into the folder @work_dir, the SSIS package BULK_TRANSFER_DATA.dtsx
  3. Create the stored procedure Direct_Path_Insert on any db of the instance.

Now the stored procedure can be called from a T-SQL script as :

exec Direct_Path_Insert <destination_table> , <source_db_name>, <destination_db_name>, @@servername, <source_select >, <where_condition for source select>

Example :

use DWHSK_STAGE_PROD
go
-- Create destination table 
select top 250 [CDR_ID]
      ,[File_ID]
      ,[Exchange_ID]
      , Charging_Start_Time
      , Create_Date
into dwhsk_stage_prod.dbo.drop_GSM_CDR_PTC_TRAFFIC_ACTUAL
from dwhsk_target_prod..GSM_CDR_PTC_TRAFFIC_ACTUAL where 1=2
-- Loads some rows into destination table 
declare
 @dest_tab varchar(200) = 'drop_GSM_CDR_PTC_TRAFFIC_ACTUAL' -- Schema is always dbo
, @src_db varchar(200) = 'DWHSK_TARGET_PROD'
, @dest_db varchar(200) = 'DWHSK_STAGE_PROD'
, @instance_name varchar(50) = 'NTSQLDWHSKT01\I03'
,@select_clause varchar(2000) =
 ' select CDR_ID, File_ID, Exchange_ID , Charging_Start_Time , Create_Date  from GSM_CDR_PTC_TRAFFIC_ACTUAL where 1=1 '
, @wherecond varchar(2000) = ' and Charging_Start_Time between ''2011-03-01'' and ''2011-03-02'' ' -- or empty string

exec dwhsk_warehouse.dbo.Direct_Path_Insert @dest_tab  , @src_db , @dest_db ,
 @instance_name , @select_clause ,  @wherecond

select * from dwhsk_stage_prod.dbo.drop_GSM_CDR_PTC_TRAFFIC_ACTUAL

drop table dwhsk_stage_prod.dbo.drop_GSM_CDR_PTC_TRAFFIC_ACTUAL

At the end, the dynamically generated SSIS packages and Windows batch files are deleted from the working directory on the database server.

Please note that the insert operation performed by Direct_Path_Insert is not atomic, but commits inserted rows in batchs of 20000 rows. That is the value of dataFlowTask.DefaultBufferMaxRows in the VB script. If an error occurs,  only the last batch is rolled back.

To have atomic behaviour, just set FastLoadMaxInsertCommitSize to 0 and all the inserted rows will be committed or rolled back in a single batch.

The name of the stored procedure Direct_Path_Insert has been chosen because offers similar functionality as Oracle Direct Path Insert ( insert with hint APPEND).

Resources:

Direct_Path_Insert.sql | BULK_TRANSFER_DATA.dtsx
Total article views: 7353 | Views in the last 30 days: 6
 
Related Articles
FORUM

Cast /Covert from nvarchar to varchar...during select in oledb src

cast during select in oledb src

FORUM

Insert - Exec Select statements

Insert - Exec Select statements

FORUM

Blocking Issue:Insert blocking select statements

Insert blocking select statements

FORUM

Tricky ...VARCHAR

VARCHAR logics

FORUM

Bulk Insert of image data via OleDb

Error when bulk inserting image data via OleDb on SQL Server 8.0 (MSDE)

Tags
bulk insert    
data pump task    
etl    
integration services (ssis)    
t-sql    
 
Contribute

Join the most active online SQL Server Community

SQL knowledge, delivered daily, free:

Email address:  

You make SSC a better place

As a member of SQLServerCentral, you get free access to loads of fresh content: thousands of articles and SQL scripts, a library of free eBooks, a weekly database news roundup, a great Q & A platform… And it’s our huge, buzzing community of SQL Server Professionals that makes it such a success.

Join us!

Steve Jones
Editor, SQLServerCentral.com

Already a member? Jump in:

Email address:   Password:   Remember me: Forgotten your password?
Steve Jones