Blog Post

Easy CDC in 5 hours (less for you!) using automatic code generation.

,

Updates:

Update #3:

Another reformat, thanks to andreas for giving me the heads up. The pre tags aren't behaving across browsers so I have had to set a fixed width of 600 pixels. I have also set the inline code text to 10 pixel consolas, which still looks pretty decent and lets you see more of the code. If you don't have consolas I highly recommend downloading it.

Also, I noticed that I've left out of the code something I updated in our implementation: when concatenating the cast columns together for the hash, you should add a delimiter between column values, to eliminate possibilities like one where two columns with the values "abc" and "de", which are then updated to the values "ab" and "cde", results in an unchanged hash value for the row even though the row has changed. The updated version is in the code file, which you can find here:

All code for this post as a single file

Update #2:

When I ran the initial performance test it was late at night and a bunch of other things were going on (integrity checks, maintenance plans, nightly jobs). A second test ran the next day (with a clean rebuild, so no saved plans) ran in 3 seconds. It would be worthwhile to force a recompilation after the initial load, because the statistics will have significantly changed. As it happens, we will probably do the initial load using bulk inserts, which I also intend to autogenerate. That will be the subject of my next post.

Introduction

There's currently a project going on at my organization to build an all new BI system, including a new enterprise DW and data marts. It's sorely needed.

As a first step, the business wanted to start capturing changed data from the main ERP system as soon as possible. This is a vendor supplied system, and has no inherent support for easy change data capture. For instance, there is not an "inserted date" and "update date" column on every table. In fact, such columns are virtually nonexistent.

I was brought into this last week. The guys working on it had committed to start capturing changed data as of January 1st. I was told that they had figured out which tables they wanted to capture data for, but the implementation was a race against time. We are operating on SQL Server 2005 and don't have 2008's built in CDC available to use. Even if we did, it is unlikely we could build a system using that technology before the deadline.

At home that night I sketched out a quick way of building a CDC system for the ERP using straight TSQL. The CDC system itself (by which I mean the DDL for all tables and ETL procedures) would be generated using functions that spit out the required DDL for each table that had been flagged for change capture.

The Metadata-Gathering Function

First you need a good function to extract metadata. The one I used was simplified by the fact that there are no user types in the system, and we will not be considering any BLOBs, chars or varchars with a length greater than 255 for change capture. The function needs to return the key columns (which we will use later for creating our join conditions). Here's the one I wrote:

------------------------------------------------------------------------------------------------------------
-- return metadata about a given table
------------------------------------------------------------------------------------------------------------
alter function CDC.source_column_definitions (
@schema_name sysname, 
@table_name sysname
) 
returns table as
    return (
        select      
                  tb.table_schema,
                  tb.table_name,
                  co.column_name,
                  co.data_type + 
                  case
                     when co.data_type like '%char%' or co.data_type like '%binary%' then
                        case
                           when co.character_maximum_length = -1 then '(max)'
                           else '(' + cast(co.character_maximum_length  as varchar(4)) + ')'
                        end
                     when co.data_type in ('decimal', 'numeric') 
                     then '(' + cast(co.numeric_precision as varchar(4)) 
                          + ', ' + cast(co.numeric_scale as varchar(4)) + ')'
                     else ''
                  end   as data_type,
                  '[' + co.column_name  + '] ' +
                  co.data_type + 
                  case
                     when co.data_type like '%char%' or co.data_type like '%binary%' then
                        case
                           when co.character_maximum_length = -1 then '(max)'
                           else '(' + cast(co.character_maximum_length  as varchar(4)) + ')'
                        end
                     when co.data_type in ('decimal', 'numeric') 
                     then '(' + cast(co.numeric_precision as varchar(4)) + ', ' 
                          + cast(co.numeric_scale as varchar(4)) + ')'
                     else ''
                  end + 
                  case 
                     when co.IS_NULLABLE = 'NO' then ' not null' 
                     else '' 
                  end  as column_definition,
                  co.ordinal_position,
                  kc.ordinal_position as key_ordinal_position           
                    
        from      ERP_DB.information_schema.tables             tb     
        join      ERP_DB.information_schema.columns            co  on  co.table_catalog = tb.table_catalog
                                                                   and co.table_schema = tb.table_schema
                                                                   and co.table_name = tb.table_name
        left join ERP_DB.information_schema.table_constraints  tc  on  tc.table_catalog = tb.table_catalog
                                                                   and tc.table_schema = tb.table_schema
                                                                   and tc.table_name = tb.table_name
                                                                   and tc.constraint_type = 'PRIMARY KEY'
        left join ERP_DB.information_schema.key_column_usage   kc  on  kc.constraint_catalog = tc.constraint_catalog
                                                                   and kc.constraint_schema = tc.constraint_schema
                                                                   and kc.constraint_name = tc.constraint_name
                                                                   and kc.column_name = co.column_name
        where     (co.character_maximum_length is null or co.character_maximum_length between 0 and 255)
        AND       co.DATA_TYPE != 'timestamp'
        and       tb.table_schema like @schema_name
        and       tb.table_name like @table_name                                                            
    )
go

In case it's not obvious, ERP_DB in the above code is a placeholder for the name of the ERP database.

A Few More Specifications...

There is a requirement that the business is able to query the CDC data for a view of the system at a point in time. In order to do this, two extra columns are added to the CDC side: the effective start and end date for the row. You may recognize this as the same idea behind a Kimball method type 2 slowly changing dimension. Then, for any given datetime value, the rows effective at that point in time are all rows in all tables where the given datetime is between the effective start and end date columns. I decided to give the latest rows an effective end date of 9999-12-31, being the maximum value of a datetime.

We decided to detect changes by using a hash of the columns we are interested in. A row is "new" when the primary key does not exist in the CDC system. A row is changed where a current row (effective end date of 9999-12-31) for the key value does exist, but the hash of the concatenated column values has changed. This design is unable to cope with changes to primary key
values, but there is no way around that.

With these requirements in mind, we now needed to create 200 tables to actually store all of the changed rows. Here's a function that will generate the required DDL for a destination CDC table.

The Table-Generation Function

-------------------------------------------------------------------------------
-- returns the ddl required to create the table that will hold captured changes
-------------------------------------------------------------------------------
alter function CDC.destination_table_DDL (
@source_table_name sysname
) 
returns varchar(max) 
as begin
   
   -- inititalize with DDL common to all CDC tables
   declare @DDL varchar(max) set @DDL = 
      'create table CDC.[' + @source_table_name + ']'
      + ' (row_effective_start_DT datetime not null, row_effective_end_DT datetime not null,'
      + ' row_hash varbinary(20) not null, '
      
   declare @index_ddl varchar(max) set @index_ddl = ''
   -- use concatenation trick to build column and index definitions
   select   @DDL = @DDL + t.column_definition + ', ',
            @index_ddl = @index_ddl + case 
                when t.key_ordinal_position is null then '' 
                else t.column_name + ', ' 
            end
   from     CDC.source_column_definitions('dbo', @source_table_name) t
   order by t.ordinal_position asc
   -- remove trailing comma, close table definition and add terminator
   set @DDL = left(@DDL, len(@DDL) - 1) + '); ' 
   
   -- add primary key
   set @DDL = @DDL
      + 'alter table CDC.[' + @source_table_name + '] add constraint PK_' + @source_table_name
      + ' primary key clustered (row_effective_start_DT, ' 
      + left(@index_ddl, len(@index_ddl) - 1) + '); '
   
   -- add unique index
   set @DDL = @DDL
      + 'create unique index ux_' + @source_table_name + ' on CDC.' + @source_table_name
      + '(' + @index_ddl 
      + 'row_effective_end_DT) include (row_hash, row_effective_start_DT); '
   
   -- add foreign key to audit schema. Comment this out if there is no audit schema
   set @DDL = @DDL 
      + 'alter table CDC.' + @source_table_name + ' add constraint FK_' + @source_table_name
      + '__ETL_executions foreign key (row_effective_start_DT) '
      + 'references CDC.ETL_executions (ETL_cutoff_DT);'
   
   return @DDL
end    
go

As you can see, there are some extra columns that will be common to all of the CDC data tables being the start and end dates, and we also want the row hash so we can compare it against the live system each time we run an ETL

This procedure will generate keys and indexes based on the existing primary keys in the ERP. We also have a foreign key constraint to an ETL control table. Note that nowhere in this article have I included DDL for the ETL control and auditing structure. You can either comment this constraint out, or create the appropriate control schema.

Changed rows will only be captured when the ETL is run, so if a row changes three or four times between executions, we will only get the last change. That's fine for us, it might not be for you. You could of course simply run the ETL more frequently, but keep in mind it will be querying essentially the entire system (as you will shortly see below), so you may not want to do that. For our purposes, once a day is enough. There's another issue with trying to run multiple ETL's while the business is creating data: It can break the integrity in the CDC system. I will explain this in my next post, since this one is already too long.

The reason for these particular indexes will become clear as we move on to the actual ETL mechanism, so let me do that now. Here's the function which returns the required DDL to create the ETL procedure for a given table.

The Procedure-Generation Function

--------------------------------------------------------------------------------------------------------
-- gets the ddl required to create the procedure to 
-- ETL changed data from the source to the CDC system
--------------------------------------------------------------------------------------------------------
alter function CDC.ETL_procedure_DDL (
@source_table_name sysname
) 
returns varchar(max) 
as begin
    
    -- used for easy search and replace to allow reformat of output to match template formatting
    declare @n char(2) set @n = '&'    
    
    -- initialize with template for merge
    declare @DDL varchar(max) set @DDL = 
    
    'create procedure CDC.ETL_{table} (@ETL_cutoff_DT datetime) as begin                              ' + @n +
    ' set nocount on                                                                                  ' + @n +
    '                                                                                                 ' + @n +
    ' declare @RC int                                                                                 ' + @n +
    '                                                                                                 ' + @n +
    ' -- do pre-execute audit stuff here                                                              ' + @n +
    '                                                                                                 ' + @n +
    ' begin tran                                                                                      ' + @n +
    '                                                                                                 ' + @n +
    ' begin try                                                                                       ' + @n +
    '                                                                                                 ' + @n +
    '     insert    CDC.{table} (row_effective_start_DT, row_effective_end_DT, row_hash, {cols})      ' + @n +
    '     select    @ETL_cutoff_DT, ''9999-12-30'', src.row_hash, {src_cols}                          ' + @n +
    '     from      (                                                                                 ' + @n +
    '               select      hashbytes(''SHA1'', {cast_cols}) as row_hash, {cols}                  ' + @n +
    '               from        [ERP_DB].dbo.{table}                                                  ' + @n +
    '               )                               src                                               ' + @n +
    '     left join CDC.{table}                     dst on  {join_cols}                               ' + @n +
    '                                                   and dst.row_effective_end_DT = ''9999-12-31'' ' + @n +
    '                                                   and src.row_hash = dst.row_hash               ' + @n +
    '     where       dst.row_effective_end_DT is null                                                ' + @n +
    '                                                                                                 ' + @n +
    '     set @RC = @@rowcount                                                                        ' + @n +
    '                                                                                                 ' + @n +
    '     if (@RC > 0) begin                                                                          ' + @n +
    '         update  dst                                                                             ' + @n +
    '         set     dst.row_effective_end_DT = case                                                 ' + @n +
    '                     when dst.row_effective_end_DT = ''9999-12-31'' then @ETL_cutoff_DT          ' + @n +
    '                     when dst.row_effective_end_Dt = ''9999-12-30'' then ''9999-12-31''          ' + @n +
    '                 end                                                                             ' + @n +
    '         from    CDC.{table}   dst                                                               ' + @n +
    '         join    CDC.{table}   src on  {join_cols}                                               ' + @n +
    '                                   and dst.row_effective_end_DT                                  ' + @n +
    '                                       between ''9999-12-30'' and ''9999-12-31''                 ' + @n +
    '                                   and src.row_effective_start_DT = @ETL_cutoff_DT               ' + @n +
    '                                                                                                 ' + @n +
    '     end                                                                                         ' + @n +
    '                                                                                                 ' + @n +
    '     commit                                                                                      ' + @n +
    '                                                                                                 ' + @n +
    '     -- do successful execution audit stuff here                                                 ' + @n +
    '                                                                                                 ' + @n +
    ' end try begin catch                                                                             ' + @n +
    '                                                                                                 ' + @n +
    '     rollback                                                                                    ' + @n +
    '                                                                                                 ' + @n +
    '     -- do failed execution audit stuff here                                                     ' + @n +
    '                                                                                                 ' + @n +
    ' end catch                                                                                       ' + @n +
    'end                                                                                              ' + @n +
    'GO'
    declare @cols varchar(max) set @cols = ''
    declare @join_cols varchar(max) set @join_cols = ''
        
    -- build strings
    select      @cols = @cols + t.column_name + ', ',
                @join_cols = @join_cols + case 
                    when t.key_ordinal_position is null then '' 
                    when t.key_ordinal_position = 1 then 'src.' + t.column_name + ' = dst.' + t.column_name
                    else ' and src.' + t.column_name + ' = dst.' + t.column_name
                end
    from        CDC.source_column_definitions('dbo', @source_table_name) t
    order by    t.key_ordinal_position
                
    -- remove trailing comm
    set @cols = left(@cols, len(@cols) - 1)
        
    -- merge with template  
    set @DDL = replace (@DDL, '{table}', @source_table_name) 
    
    set @DDL = replace(@DDL, '{cols}', @cols)
    set @cols = 'src.' + replace(@cols, ', ', ', src.')
    set @DDL = replace(@DDL, '{src_cols}', @cols)
    set @cols = replace(@cols, 'src.', 'isnull(cast(') 
    set @cols = replace(@cols, ', ', ' as varchar(255)), '''') + ')
    set @cols = @cols + ' as varchar(255)), '''')'
    set @DDL = replace(@DDL, '{cast_cols}', @cols) 
    set @DDL = replace(@DDL, '{join_cols}', @join_cols)
        
    return @DDL
end
go

NOTE: As written this won't work, as the template doesn't include the inserts into the control table, and thus the foriegn key will be violated. As before, since everyone's auditing structure is likely to be different, I leave the insertion of that into the template as an exercise for the reader.

The Autogeneration Logic

The mechanism here is something like a mail merge. I create a common template, and then replace brace-delimited placeholders with the table-specific information. I have the template nicely formatted so it's easy for me to see problems, since this is just a string and I have no actual schema to validate against! Each line of the template is terminated with an ampersand so that the output can be pasted into, say, textpad, and a search and replace can be run to replace those ampersands with newlines, which will mean the final DDL will also be nicely formatted.

Due to the way the column list strings are built we need to drop the trailing commas. It's also important to select from our metadata function in some known order for the key columns (any will do, but it makes the string generation easiest to just use the key ordinal position), because our join condition code changes for composite PK tables, with the first column used being a simple column1 = column2, but further join predicates requiring an "and" be included in the DDL.

Note also that I'm using a concatenation trick in this code. Be careful, this may not work with parallelism enabled, but SQL probably won't use parallelism here. If you're worried you can always add a maxdop hint.

The ETL Logic

As far as the generated stored procedure logic goes, I first do the insert of new rows with another special value of 9999-12-30. I do this in order to be able to process a table in just two passes while only having to query the source data once instead of twice. After inserting the new rows with the second special end date value of 9999-12-30, I run an update which does two things: 1) it expires any matching "current" rows (rows with the same key value(s) as the ones I just inserted but with an end date of 9999-12-31) by changing their expiry date to the date of the ETL execution (passed in as a parameter), and 2) it updates the rows I just inserted, being the ones with the 9999-12-30 special value and a start date of the ETL execution date, which now become the current rows by setting their expiry to 9999-12-31. The advantage of doing it this way as opposed to using a temp table, which might seem more obvious, is that I get to use fully indexed joins for both the initial left join to the source, and the subsequent update operation.

The new rows cannot simply be inserted with the 9999-12-31 end date, because if this was done there would be more than one row with the same key value and end date, violating the integrity of the data (and the unique constraint which is there to protect that integrity!)

If you have an auditing and control mechanism you can also add the required sql to the template, I've added comments at good spots. The pre-execute is probably an insert saying the table load as started with the execution date, and the post will be an update to that inserted audit row. Somewhere outside of all of these procedures you should create an audit master row for the entire ETL. That would be the table I had a foreign key to earlier.

Performance

I did process one of the tables in the system using the generated objects as a quick test. It contains 71000 rows (which is small), however since the CDC tables currently have no data, all 71000 rows will be considered changed... and that's a lot of changes in one ETL execution. That's a worst case scenario for performance, but this design processes those 71000 changed rows in about 7 seconds. I tried another table with 8500 rows (again, all of which will be considered as changed on an initial load), which took no time at all according to SSMS.

Conclusion.. and How to Use This Stuff

So there you go, this code will generate an entire CDC implementation, against a system that has no inherent support for it.

You can quickly generate all the necessary DDL by filling a table with the names of all the tables you want to capture, and then running the functions against the table. Something like this:

-- this table will hold the name of all the tables you want to capture
create table CDC.tables (table_name sysname)
-- do your inserts here....
-- get the DDL for the CDC objects
select      CDC.destination_table_ddl(t.table_name), CDC.ETL_procedure_DDL(t.table_name)
from        CDC..tables t

Don't forget to replace the ampersands in the output procedure definition, and the ERP_DB placeholders in the metadata functions!

Rate

You rated this post out of 5. Change rating

Share

Share

Rate

You rated this post out of 5. Change rating