SQL 2008 MERGE using Dynamic SQL over a Linked Server

,

Yet another reason to upgrade to 2008; MERGE. I wrote this for a BI project I was working on recently.. This SP is self explanatory. It Uses dynamic SQL to build a MERGE statement over a linked server with some additional parameters; i.e. Oracle vs. SQL, limit the source data, etc.

I'm using this in combination with SSIS as part of an ETL task. i.e. use a data flow task to interate through the sys.tables in a given schema and put the table names into an ADO recordset object, then execute this SP using an expression inside a for each loop container.

Enjoy! (just give me some credit if you use it :)-)

IF OBJECT_ID('dbo.usp_dynamic_merge') IS NOT NULL
 DROP PROCEDURE dbo.usp_dynamic_merge
go

IF @@VERSION NOT LIKE '%10.0.%'
RAISERROR ('MERGE is only supported on SQL 2008 and above.', 16, 1)
go

CREATE PROCEDURE dbo.usp_dynamic_merge
 @local_schema_name sysname,
 @remote_server_name sysname,
 @remote_database_name sysname,
 @remote_schema_name sysname,
 @local_table_name sysname, 
 @remote_table_name sysname,
 @merge_last_modified_enabled bit,
 @merge_last_modified_column sysname,
 @merge_last_modified_dt datetime,
 @merge_delete_enabled bit,
 @is_oracle bit,
 @debug_enabled bit,
 @output_enabled bit
 WITH ENCRYPTION
AS

/* 
Name:          dbo.usp_dynamic_merge
Contact:          Tommy Bollhofer (tbollhofer2@gmail.com)
Purpose:          Uses dynamic SQL to build and execute a MERGE statement across a linked server. 
Last Modified:          03/24/2009
Usage Examples:

This example will execute the procedure using a remote ORACLE RDBMS as the source, limit the number of rows returned using 
'MODIFY_DT', merging on the primary/composite key, delete disabled, output enabled, and with debug enabled (i.e. printing the 
dynamic sql statement as opposed to executing it): 

EXEC dbo.usp_dynamic_merge
        @local_schema_name = 'dbo',
 @remote_server_name = 'ORACLE_10G',
 @remote_database_name = NULL,
 @remote_schema_name = 'FOO',
 @local_table_name = 'TEST', 
 @remote_table_name = 'TEST', 
 @merge_last_modified_enabled = 1,
 @merge_last_modified_column = 'MODIFY_DT',
 @merge_last_modified_dt = NULL,
 @merge_delete_enabled = 0,
 @is_oracle = 1,
 @debug_enabled = 1,
 @output_enabled = 1

This example will execute the procedure using a remote MSSQL RDBMS as the source, passing the (3) part name, without limits on the 
number of rows returned at the source merging on the primary/composite key, delete disabled, output enabled, and with debug enabled: 

EXEC dbo.usp_dynamic_merge
        @local_schema_name = 'Person',
 @remote_server_name = 'SQL_2008',
 @remote_database_name = 'AdventureWorks2008',
 @remote_schema_name = 'Person',
 @local_table_name = 'Person', 
 @remote_table_name = 'Person', 
 @merge_last_modified_enabled = 0,
 @merge_last_modified_column = NULL,
 @merge_last_modified_dt = NULL,
 @merge_delete_enabled = 0,
 @is_oracle = 0,
 @debug_enabled = 1,
 @output_enabled = 1

*/

SET NOCOUNT ON 

DECLARE @merge_join_clause varchar(max), 
        @merge_dynamic_sql nvarchar(max),
        @pk_output sysname,
        @pk_1 sysname, 
        @pk_2 sysname,
        @pk_dynamic_sql nvarchar(max),
        @dynamic_insert_clause varchar(max),
        @dynamic_update_clause varchar(max)


IF @merge_last_modified_enabled = 1 AND @merge_last_modified_dt IS NULL
    BEGIN
     --SET A DEFAULT OF 3 DAYS IF NULL IS PASSED IN
        SET @merge_last_modified_dt = DATEADD(dd, DATEDIFF(dd,0,getutcdate())-3, 0)    
    END    
        
DECLARE @pk_table TABLE
(
    table_qualifier sysname,
    table_owner sysname,
    table_name sysname,
    column_name sysname,
    key_seq int,
    pk_name sysname
)

DECLARE @insert_table TABLE 
(
    row_id int identity(1,1), 
    insert_column varchar(max)
)

DECLARE @insert_table_merge TABLE 
(
    row_id int identity(1,1), 
    insert_statement varchar(max)
) 

DECLARE @update_table TABLE 
(
    row_id int identity(1,1), 
    update_column varchar(max)
)

DECLARE @update_table_merge TABLE 
(
    row_id int identity(1,1), 
    update_statement varchar(max)
)

INSERT INTO @pk_table
 EXEC sp_pkeys @local_table_name, @local_schema_name
 
SELECT @pk_output = column_name 
 FROM @pk_table 
WHERE key_seq = 1 

IF (SELECT COUNT(*) FROM @pk_table) > 1
    BEGIN
        SELECT @pk_1 = 'ON ' + @local_schema_name + '_TARGET.' + column_name + ' = ' + @local_schema_name + '_SOURCE.' + column_name FROM @pk_table WHERE key_seq = 1
        SELECT @pk_2 = 'AND ' + @local_schema_name + '_TARGET.' + column_name + ' = ' + @local_schema_name + '_SOURCE.' + column_name FROM @pk_table WHERE key_seq = 2
        SELECT @merge_join_clause = @pk_1 + ' ' + @pk_2
    END
ELSE
    BEGIN
        SELECT @pk_1 = 'ON ' + @local_schema_name + '_TARGET.' + column_name + ' = ' + @local_schema_name + '_SOURCE.' + column_name from @pk_table where key_seq = 1
        SELECT @merge_join_clause = @pk_1
    END

INSERT INTO @insert_table (insert_column)
SELECT STUFF(sc.name, 1, 0, @local_schema_name + '_SOURCE.')
 FROM sys.all_columns sc WITH(NOLOCK)
WHERE SC.OBJECT_ID=OBJECT_ID(@local_schema_name + '.' + @local_table_name) 
ORDER BY column_id 
 
INSERT INTO @insert_table_merge
SELECT DISTINCT STUFF((SELECT ',' + b.insert_column
 FROM @insert_table b 
 FOR XML PATH('')),1,1,'')
 FROM @insert_table a 
 
SELECT @dynamic_insert_clause = insert_statement
 FROM @insert_table_merge 

INSERT INTO @update_table (update_column) 
SELECT STUFF(sc.name, 1, 0, @local_schema_name + '_TARGET.') + ' = ' + STUFF(sc.name, 1, 0, @local_schema_name + '_SOURCE.')
 FROM sys.all_columns sc WITH(NOLOCK)
WHERE SC.OBJECT_ID=OBJECT_ID(@local_schema_name + '.' + @local_table_name) 
ORDER BY column_id 
 
INSERT INTO @update_table_merge
SELECT DISTINCT STUFF((SELECT ',' + b.update_column
 FROM @update_table b 
 FOR XML PATH('')),1,1,'')
 FROM @update_table a 
 
SELECT @dynamic_update_clause = 'UPDATE SET ' + update_statement + ' '
 FROM @update_table_merge    
            
 SET @merge_dynamic_sql = 
            CASE WHEN @output_enabled = 1
            THEN 'DECLARE @merge_output TABLE (table_name sysname,operation sysname,value int);'
            ELSE ''
            END
            + ' '
            + 'MERGE ' + @local_schema_name + '.' + @local_table_name + ' AS ' + @local_schema_name + '_TARGET '
            + 'USING OPENQUERY(' + @remote_server_name + ', ' 
            + 
            CASE WHEN @remote_database_name IS NOT NULL
            THEN '''SELECT ' + REPLACE(@dynamic_insert_clause,@local_schema_name + '_SOURCE.','') + ' FROM ' + @remote_database_name + '.' + @remote_schema_name + '.' + @remote_table_name 
            ELSE '''SELECT ' + REPLACE(@dynamic_insert_clause,@local_schema_name + '_SOURCE.','') + ' FROM ' + @remote_schema_name + '.' + @remote_table_name
            END
            +
            CASE 
            WHEN @merge_last_modified_enabled = 1 AND @is_oracle = 1
            THEN ' WHERE ' + @merge_last_modified_column + ' >= TO_DATE(' + '''''' 
                 + CAST(DATEPART(YYYY,@merge_last_modified_dt) AS CHAR(4)) + '-' 
                 + RIGHT(CAST(100+DATEPART(MM,@merge_last_modified_dt) AS CHAR(3)),2) + '-'
                 + RIGHT(CAST(100+DATEPART(DD,@merge_last_modified_dt) AS CHAR(3)),2) + ' '
                 + RIGHT(CAST(100+DATEPART(HH,@merge_last_modified_dt) AS CHAR(3)),2) + ':'
                 + RIGHT(CAST(100+DATEPART(MI,@merge_last_modified_dt) AS CHAR(3)),2) + ':'
                 + RIGHT(CAST(100+DATEPART(SS,@merge_last_modified_dt) AS CHAR(3)),2) 
                 + '''''' + ', ''''yyyy-mm-dd hh24:mi:ss'''')'
            ELSE ''
            END
            +
            CASE 
            WHEN @merge_last_modified_enabled = 1 AND @is_oracle = 0
            THEN ' WHERE ' + @merge_last_modified_column + ' >= ' + '''''' 
                 + CAST(DATEPART(YYYY,@merge_last_modified_dt) AS CHAR(4)) + '-' 
                 + RIGHT(CAST(100+DATEPART(MM,@merge_last_modified_dt) AS CHAR(3)),2) + '-'
                 + RIGHT(CAST(100+DATEPART(DD,@merge_last_modified_dt) AS CHAR(3)),2) + ' '
                 + RIGHT(CAST(100+DATEPART(HH,@merge_last_modified_dt) AS CHAR(3)),2) + ':'
                 + RIGHT(CAST(100+DATEPART(MI,@merge_last_modified_dt) AS CHAR(3)),2) + ':'
                 + RIGHT(CAST(100+DATEPART(SS,@merge_last_modified_dt) AS CHAR(3)),2) 
                 + '.000' 
                 + ''''''
            ELSE ''
            END
            + ''') AS ' + @local_schema_name + '_SOURCE '
            + @merge_join_clause
            + ' '
            + 'WHEN MATCHED THEN '
            + @dynamic_update_clause 
            + 'WHEN NOT MATCHED BY TARGET THEN ' 
            + 'INSERT (' + REPLACE(@dynamic_insert_clause,@local_schema_name + '_SOURCE.','') + ') '
            + 'VALUES (' + @dynamic_insert_clause + ') '
            +
            CASE WHEN @merge_delete_enabled = 1
            THEN 'WHEN NOT MATCHED BY SOURCE THEN DELETE '
            ELSE ''
            END
            +    
            CASE WHEN @output_enabled = 1
            THEN 'OUTPUT ' + '''' + @local_schema_name + '.' + @local_table_name + + '''' + ', ' 
                + '$action, '
                + 'isnull(inserted.' + @pk_output + ',deleted.' + @pk_output + ') '
                + 'INTO @merge_output; '    
                + 'SELECT table_name, '
                + 'operation, '
                + 'count(value) as total '
                + 'FROM @merge_output '
                + 'GROUP BY operation, '
                + 'table_name;'
            ELSE ';'
            END
                            
IF @debug_enabled = 0
BEGIN                    
    EXEC sp_executeSQL @merge_dynamic_sql;
END
ELSE IF @debug_enabled = 1
BEGIN
    PRINT @merge_dynamic_sql
END

GO

Rate

5 (4)

Share

Share

Rate

5 (4)