Incremental Data Loading Using CDC

  • markoos1

    SSChasing Mays

    Points: 653

    Comments posted to this topic are about the item Incremental Data Loading Using CDC

  • sqlserver-1064800

    Valued Member

    Points: 64

    Hi Mark,

    Thanks for writing this article it was interesting to see a different way of implementing CDC incremental loading. I have also implemented an incremental load using CDC and I found that the really important part is ensuring synchronisation between the data warehouse tables and the tracked tables. You mention a couple of methods for resynchronisation in your article and I have an additional method which works quite reliably for me which I thought you might be interested in.

    In my solution I have a SQL replication step between the production OLTP and the data warehouse. This gives me control over the data flow into the monitored tables and allows me to pause replication apply changes to the data warehouse. Once finished I can then restart the data flow and CDC monitoring without losing any of the change data. This of course requires the replication subscriptions to be setup not to expire during the stoppage.

    Cheers

    Martin

  • Koen Verbeeck

    SSC Guru

    Points: 258950

    Nice article, thanks for sharing.

    Need an answer? No, you need a question
    My blog at https://sqlkover.com.
    MCSE Business Intelligence - Microsoft Data Platform MVP

  • Arjun SreeVastsva

    SSCertifiable

    Points: 7135

    got knowledge of incremental data loading thanks for the post

  • casinc815

    Old Hand

    Points: 303

    Mark:

    Great article. Nice clarification and demonstration of CDC.

    It has often been a problem explaining to programmers the

    intent of CDC and to execute the project correctly.

    One note, there are no footnotes to your previsous two article.

    For completeness in discussing CDC, can you either post or send

    them to me.

    Thanks!

  • markoos1

    SSChasing Mays

    Points: 653

    casinc815,

    The links are at the beginning. They are: Article 1[/url] and Article 2[/url].

    -Mark

  • casinc815

    Old Hand

    Points: 303

    thanks, Mark!

  • JazLady

    Old Hand

    Points: 358

    I read the article about Incremental Data Loading using CDC - but can you create an article and demo step by step on how to capture changes from a bulk load and incremental load from a source that does not have CDC (like SQL Server 2005). most of my tables don't have datetime column so I don't know if the record is new or updated by using datetime alone. Can you help me?

    I need to do a bulk load initially from 3 servers merged into one server database and perform incremental updates realtime or each night.

  • RichB

    SSCrazy Eights

    Points: 9651

    Still a good article, but would be even better if you addressed some of the potential pitfalls of it.

    Like runaway transaction log growth if you stop being able to clean out the system properly.

  • jhasbufuidh

    SSC Eights!

    Points: 992

    Hello,

    Great article, I would like to know more about how to handle disruptions:

    - Server crash

    - SQL Server instance restart

    - CDC package fails half way

    - CDC Package fails due to time out or network error

    - CDC Package fails due to transaction log full

    - Source table DDL Changes like add a column and populate it

    Should I just rerun the CDC in Reprocessing mode?

    In my scenario I track a table which is updated once a day.

    BI Guy

  • jhasbufuidh

    SSC Eights!

    Points: 992

    I did some testing.

    added the reprocessing column option to the cdc source.

    Performed a couple changes in DDL and DML over the tracked table.

    Ran the CDC update package which failed. The time out was 520 for the test.

    The incremental CDC package would now fail with 1 warning about reprocessing mode and a time out error.

    Truncating the target table and running a 10 min initial load was the only way I found to get the incremental update to work again.

    BI Guy

  • R.P.Rozema

    SSChampion

    Points: 12300

    I've run cdc for incremental loading in several projects and I've found that it a) has some bugs,  plus b) it has performance issues with larger datasets. In a MS Connect item I've reported these issues to Microsoft, including a procedure generating alternative functions for the fn_cdc_get_net_changes_<capture_instance>() functions that do return correct results to illustrate the problems. The attachment does however not (yet) show in Connect, so I share it here too hoping you can all vote-up my connect item for MS to seriously consider fixing the log reader....

    https://connect.microsoft.com/SQLServer/feedback/details/3136110/cdc-functions-fn-cdc-get-net-changes-are-slow-and-incorrect-due-to-incorrect-operation-values
    Please vote up!

    Below procedure is the attachment I've sent to MS. It generates alternative fn_cdc_get_net_changes_...() functions that will not return superfluous rows with __$operation = 1 generated from deferred updates. The generated functions are highly optimised to be as fast as, if no faster than, MS' "fixed" version. However if your data has a lot of 'deleted' rows (__$operation equals 1) the fix requires an additional scan on the _CT table MS' version does not require, so in these situations the fix may cost you a little performance. The better solution would be MS fixing the log reader to never put defered updates as pairs of 1 & 2 into the _CT tables.... In which case the fn_cdc_get_net_changes functions can get really blazing fast, even for large data sets!
    CREATE PROCEDURE [dbo].[spGenerateCustomCDCnetFunctions]
    as
    begin
       set nocount on;

       if object_id('tempdb..#definitions') is not null
           drop table #definitions;

       raiserror('Generating function bodies...', 0, 0) with nowait;

            select
             ct.object_id,
             '[cdc].' + quotename('fn_rpr_get_net_changes_' + ct.capture_instance ) as functionName,
       N'CREATE FUNCTION [cdc].' + quotename('fn_rpr_get_net_changes_' + ct.capture_instance)
                + s.crlf + N'(    @from_lsn binary(10),'
                + s.crlf + N'    @to_lsn binary(10),'
                + s.crlf + N'    @row_filter_option nvarchar(30)'
                + s.crlf + N')'
                + s.crlf + N'returns table'
                + s.crlf + N'return'
                + s.crlf + N''
                + s.crlf + N' select'
                + s.crlf + N'  NULL as __$start_lsn,'
                + s.crlf + N'  NULL as __$operation,'
                + s.crlf + N'  NULL as __$update_mask'

        + (
         select N','
        + s.crlf + N'  NULL as ' + quotename(col.column_name) as [text()]
         from cdc.captured_columns col
         where col.object_id = ct.object_id
         order by col.column_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)')

                + s.crlf + N' where ( [sys].[fn_cdc_check_parameters]( N' + quotename(ct.capture_instance, '''') + ', @from_lsn, @to_lsn, lower(rtrim(ltrim(@row_filter_option))), 1) = 0)'
                + s.crlf + N''
                + s.crlf + N' union all'
                + s.crlf + N''
                + s.crlf + N''
                + s.crlf + N' select'
                + s.crlf + N'  l.[__$start_lsn],'
                + s.crlf + N'  case'
                + s.crlf + N'   when l.__$operation = 1 then 1'
                + s.crlf + N'   when l.__$operation = 2 then 5'
                + s.crlf + N'   when l.__$operation = 4 then 5'
                + s.crlf + N'  end as __$operation,'
                + s.crlf + N'  NULL as [__$update_mask]'

        + (
         select N','
        + s.crlf + N'  l.' + quotename(col.column_name) as [text()]
         from cdc.captured_columns col
         where col.object_id = ct.object_id
         order by col.column_ordinal
         for xml path(''), type
        ).value('.','nvarchar(max)')

                + s.crlf + N' from ('
                + s.crlf + N'   select top (1) NULL as c'
                + s.crlf + N'   from ('
                + s.crlf + N'     select'
                + s.crlf + N'      lower(rtrim(ltrim(@row_filter_option))),'
                + s.crlf + N'      N' + quotename( ct.capture_instance, '''')
                + s.crlf + N'    ) p ([filter_option], [capture_instance])'
                + s.crlf + N'   where p.[filter_option] = N''all with merge'''
                + s.crlf + N'       and [sys].[fn_cdc_check_parameters]( p.[capture_instance], @from_lsn, @to_lsn, p.filter_option, 1) = 1'
                + s.crlf + N'  ) c'
                + s.crlf + N'  cross join ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l -- Last operation per key value.'
                + s.crlf + N'  inner join ('
                + s.crlf + N'    select'

        + (
         select
                 s.crlf + N'     m.' + quotename(ic.column_name) + ',' as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)')

                + s.crlf + N'     max(m.__$seqval) as __$seqval_max, -- Sequence value for Last operation.'
                + s.crlf + N'     min(m.__$seqval) as __$seqval_min -- Sequence value for First operation.'
                + s.crlf + N'    from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' m'
                + s.crlf + N'    where m.__$start_lsn >= @from_lsn'
                + s.crlf + N'     and m.__$start_lsn <= @to_lsn'
                + s.crlf + N'     and m.__$operation in (1,2,4)'
                + s.crlf + N'    group by'

        + stuff((
         select
           N','
                + s.crlf + N'     m.' + quotename(ic.column_name) as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)'), 1, 1, '')

                + s.crlf + N'   ) x on ('

        + stuff((
         select N' and x.' + quotename(ic.column_name) + ' = l.' + quotename(ic.column_name) as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)'), 1, 5, '') + N' and x.__$seqval_max = l.__$seqval)'

                + s.crlf + N'  where l.__$start_lsn >= @from_lsn'
                + s.crlf + N'   and l.__$start_lsn <= @to_lsn'
                + s.crlf + N'            and (l.__$operation = 4 -- New values for updates.'
                + s.crlf + N'    or l.__$operation = 2 -- Inserted values or New values for defered updates.'
                + s.crlf + N'    or (l.__$operation = 1 -- Old values for defered updates or deletes.'
                + s.crlf + N'      and not exists ('
                + s.crlf + N'        -- Suppress all deletes that are the old values part of a defered update.'
                + s.crlf + N'        -- We only use the other part of the defered update, the row with __$operation = 2.'
                + s.crlf + N'        select *'
                + s.crlf + N'        from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l3'
                + s.crlf + N'        where'

        + stuff((
         select
                + s.crlf + N'         and l3.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)'), 1, 32, '')

                + s.crlf + N'         and l3.__$seqval = x.__$seqval_max'
                + s.crlf + N'         and l3.__$operation = 2'
                + s.crlf + N'       )'
                + s.crlf + N'      and not exists ('
                + s.crlf + N'        -- If there is only 1 sequence for row ''l'', we don''t need to check for previous'
                + s.crlf + N'        -- actions (hence the not x.__$seqval_max = x.__$seqval_min). But if multiple'
                + s.crlf + N'        -- sequences do exist, we need to suppress those deletes for rows that were'
                + s.crlf + N'        -- first inserted within this same range (__$seqval_min). Net result should'
                + s.crlf + N'        -- look as if the row never existed at all. But again, we need to exclude'
                + s.crlf + N'        -- deferred updates here too: if both __$operation = 1 and __$operation = 2'
                + s.crlf + N'        -- exists it is a defered update instead of an insert.'
                + s.crlf + N'        select *'
                + s.crlf + N'        from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l4'
                + s.crlf + N'        where not x.__$seqval_max = x.__$seqval_min'

        + (
         select
                + s.crlf + N'         and l4.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)')

                + s.crlf + N'         and l4.__$seqval = x.__$seqval_min'
                + s.crlf + N'         and l4.__$operation in (1, 2)'
                + s.crlf + N'        having min(l4.__$operation) = 2'
                + s.crlf + N'      )'
                + s.crlf + N'     )'
                + s.crlf + N'    )'
                + s.crlf + N''
                + s.crlf + N' union all'
                + s.crlf + N''
                + s.crlf + N' select'
                + s.crlf + N'  l.[__$start_lsn],'
                + s.crlf + N'  case'
                + s.crlf + N'   when l.__$operation = 1 -- We have already excluded all'
                + s.crlf + N'    then 1     -- deletes of rows that did not'
                + s.crlf + N'           -- exist before the batch started'
                + s.crlf + N'           -- in the where conditions. So any'
                + s.crlf + N'           -- delete(1) __$operation here must'
                + s.crlf + N'           -- be a net delete(1) of a row that'
                + s.crlf + N'           -- existed before the batch.'
                + s.crlf + N'   when exists ('
                + s.crlf + N'      -- If the l.__$operation is not a delete(1),'
                + s.crlf + N'      -- the net __$operation depends on what the'
                + s.crlf + N'      -- first __$operation in this batch is. If'
                + s.crlf + N'      -- the first __$operation is an insert(2), and'
                + s.crlf + N'      -- not one that is part of a defered update,'
                + s.crlf + N'      -- then the net __$operation is an insert(2).'
                + s.crlf + N'      -- Otherwise the row existed before this batch'
                + s.crlf + N'      -- started and thus the net __$operation is an'
                + s.crlf + N'      -- update(4). For a defered update both an'
                + s.crlf + N'      -- insert(2) and a delete(1) __$operation'
                + s.crlf + N'      -- exists, for an insert only an insert(2)'
                + s.crlf + N'      -- __$operation exists. So I''m looking for'
                + s.crlf + N'      -- a single __$operation 2 on the first operation.'
                + s.crlf + N'      -- I can''t optimize for the single operation'
                + s.crlf + N'      -- situation here, because the last operation'
                + s.crlf + N'      -- may be a defered update, giving'
                + s.crlf + N'      -- l.__$operation = 2 whereas the net result'
                + s.crlf + N'      -- should be update(4).'
                + s.crlf + N'      select *'
                + s.crlf + N'      from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l4'
                + s.crlf + N'      where'

        + stuff((
         select
                + s.crlf + N'       and l4.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)'), 1, 26, '')
       
                + s.crlf + N'       and l4.__$seqval = x.__$seqval_min'
                + s.crlf + N'       and l4.__$operation in (1, 2)'
                + s.crlf + N'      having min(l4.__$operation) = 2'
                + s.crlf + N'     )'
                + s.crlf + N'    then 2'
                + s.crlf + N'    else 4'
                + s.crlf + N'  end as __$operation,'
                + s.crlf + N'  NULL as [__$update_mask]'

        + (
         select N','
        + s.crlf + N'  l.' + quotename(col.column_name) as [text()]
         from cdc.captured_columns col
         where col.object_id = ct.object_id
         order by col.column_ordinal
         for xml path(''), type
        ).value('.','nvarchar(max)')

                + s.crlf + N' from ('
                + s.crlf + N'   select top (1) NULL as c'
                + s.crlf + N'   from ('
                + s.crlf + N'     select'
                + s.crlf + N'      lower(rtrim(ltrim(@row_filter_option))),'
                + s.crlf + N'      N' + quotename( ct.capture_instance, '''')
                + s.crlf + N'    ) p ([filter_option], [capture_instance])'
                + s.crlf + N'   where p.[filter_option] = N''all'''
                + s.crlf + N'       and [sys].[fn_cdc_check_parameters]( p.[capture_instance], @from_lsn, @to_lsn, p.filter_option, 1) = 1'
                + s.crlf + N'  ) c'
                + s.crlf + N'  cross join ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l -- Last operation per key value.'
                + s.crlf + N'  inner join ('
                + s.crlf + N'   select'

        + (
         select
                + s.crlf + N'     m.' + quotename(ic.column_name) + ',' as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)')

                + s.crlf + N'     max(m.__$seqval) as __$seqval_max, -- Sequence value for Last operation.'
                + s.crlf + N'     min(m.__$seqval) as __$seqval_min -- Sequence value for First operation.'
                + s.crlf + N'    from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' m'
                + s.crlf + N'    where m.__$start_lsn >= @from_lsn'
                + s.crlf + N'     and m.__$start_lsn <= @to_lsn'
                + s.crlf + N'     and m.__$operation in (1,2,4)'
                + s.crlf + N'    group by'

        + stuff((
         select
           N','
                + s.crlf + N'     m.' + quotename(ic.column_name) as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)'), 1, 1, '')

                + s.crlf + N'   ) x on ('

        + stuff((
         select N' and x.' + quotename(ic.column_name) + ' = l.' + quotename(ic.column_name) as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)'), 1, 5, '') + N' and x.__$seqval_max = l.__$seqval)'

                + s.crlf + N' where l.__$start_lsn >= @from_lsn'
                + s.crlf + N'  and l.__$start_lsn <= @to_lsn'
                + s.crlf + N'        and (l.__$operation = 4  -- New values for updates.'
                + s.crlf + N'   or l.__$operation = 2 -- Inserted values or New values for defered updates.'
                + s.crlf + N'   or (l.__$operation = 1 -- Deleted values or Old values for defered updates.'
                + s.crlf + N'     and not exists ('
                + s.crlf + N'       -- Exclude all deletes that are the old values part of a defered update.'
                + s.crlf + N'       -- We will only use the other part of the defered update, the row with'
                + s.crlf + N'       -- l.__$operation = 2.'
                + s.crlf + N'       select *'
                + s.crlf + N'       from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l3'
                + s.crlf + N'       where'

        + stuff((
         select
                + s.crlf + N'         and l3.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)'), 1, 32, '')

                + s.crlf + N'        and l3.__$seqval = x.__$seqval_max'
                + s.crlf + N'        and l3.__$operation = 2'
                + s.crlf + N'      )'
                + s.crlf + N'     and not exists ('
                + s.crlf + N'       -- If there is only 1 operation for row ''l'', we don''t need to check for previous'
                + s.crlf + N'       -- actions (hence the not x.__$seqval_max = x.__$seqval_min). But if multiple'
                + s.crlf + N'       -- sequences do exist, we need to exclude those deletes for rows that were'
                + s.crlf + N'       -- first inserted within this same range (__$seqval_min). Net result should'
                + s.crlf + N'       -- look as if the row never existed at all. But again, we need to account for'
                + s.crlf + N'       -- defered updates here too: if both __$operation = 1 and __$operation = 2'
                + s.crlf + N'       -- exists the first operation is a defered update instead of an insert and'
                + s.crlf + N'       -- we should include the delete(1) operation l.'
                + s.crlf + N'       select *'
                + s.crlf + N'       from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l4'
                + s.crlf + N'       where not x.__$seqval_max = x.__$seqval_min'

        + (
         select
                + s.crlf + N'        and l4.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)')

                + s.crlf + N'        and l4.__$seqval = x.__$seqval_min'
                + s.crlf + N'        and l4.__$operation in (1, 2)'
                + s.crlf + N'       having min(l4.__$operation) = 2'
                + s.crlf + N'     )'
                + s.crlf + N'    )'
                + s.crlf + N'   )'
                + s.crlf + N''
                + s.crlf + N' union all'
                + s.crlf + N''
                + s.crlf + N' select'
                + s.crlf + N'  l.[__$start_lsn],'
                + s.crlf + N'  case'
                + s.crlf + N'   when l.__$operation = 1'
                + s.crlf + N'    then 1'
                + s.crlf + N'   when exists ('
                + s.crlf + N'      select *'
                + s.crlf + N'      from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l4'
                + s.crlf + N'      where'

        + stuff((
         select
                + s.crlf + N'       and l4.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)'), 1, 26, '')

                + s.crlf + N'         and l4.__$seqval = x.__$seqval_min'
                + s.crlf + N'         and l4.__$operation in (1, 2)'
                + s.crlf + N'        having min(l4.__$operation) = 2'
                + s.crlf + N'       )'
                + s.crlf + N'    then 2'
                + s.crlf + N'    else 4'
                + s.crlf + N'  end as __$operation,'
                + s.crlf + N'  x.__$update_mask'

        + (
         select N','
        + s.crlf + N'  l.' + quotename(col.column_name) as [text()]
         from cdc.captured_columns col
         where col.object_id = ct.object_id
         order by col.column_ordinal
         for xml path(''), type
        ).value('.','nvarchar(max)')

                + s.crlf + N' from ('
                + s.crlf + N'   select top (1) NULL as c'
                + s.crlf + N'   from ('
                + s.crlf + N'     select'
                + s.crlf + N'      lower(rtrim(ltrim(@row_filter_option))),'
                + s.crlf + N'      N' + quotename( ct.capture_instance, '''')
                + s.crlf + N'    ) p ([filter_option], [capture_instance])'
                + s.crlf + N'   where p.[filter_option] = N''all with mask'''
                + s.crlf + N'       and [sys].[fn_cdc_check_parameters]( p.[capture_instance], @from_lsn, @to_lsn, p.filter_option, 1) = 1'
                + s.crlf + N'  ) c'
                + s.crlf + N'  cross join ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l -- Last operation per key value.'
                + s.crlf + N'  inner join ('
                + s.crlf + N'    select'

        + (
         select
                + s.crlf + N'     m.' + quotename(ic.column_name) + ',' as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)')

                + s.crlf + N'     max(m.__$seqval) as __$seqval_max, -- Sequence value for Last operation.'
                + s.crlf + N'     min(m.__$seqval) as __$seqval_min, -- Sequence value for First operation.'
                + s.crlf + N'     [sys].[ORMask](m.__$update_mask) as __$update_mask -- bitwise OR of all update_masks.'
                + s.crlf + N'    from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' m'
                + s.crlf + N'    where m.__$start_lsn >= @from_lsn'
                + s.crlf + N'     and m.__$start_lsn <= @to_lsn'
                + s.crlf + N'     and m.__$operation in (1,2,4)'
                + s.crlf + N'    group by'

        + stuff((
         select
           N','
                + s.crlf + N'     m.' + quotename(ic.column_name) as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)'), 1, 1, '')

                + s.crlf + N'   ) x on ('

        + stuff((
         select N' and x.' + quotename(ic.column_name) + ' = l.' + quotename(ic.column_name) as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)'), 1, 5, '') + N' and x.__$seqval_max = l.__$seqval)'

                + s.crlf + N' where l.__$start_lsn >= @from_lsn'
                + s.crlf + N'  and l.__$start_lsn <= @to_lsn'
                + s.crlf + N'            and (l.__$operation = 4  -- New values for updates.'
                + s.crlf + N'    or l.__$operation = 2 -- Inserted values or New values for defered updates.'
                + s.crlf + N'    or (l.__$operation = 1 -- Old values for defered updates or deletes.'
                + s.crlf + N'      and not exists ('
                + s.crlf + N'        -- Exclude all deletes that are the old values part of a defered update.'
                + s.crlf + N'        -- We only use the other part of the defered update, the row with __$operation = 2.'
                + s.crlf + N'        select *'
                + s.crlf + N'        from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l3'
                + s.crlf + N'        where'

        + stuff((
         select
                + s.crlf + N'         and l3.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)'), 1, 32, '')

                + s.crlf + N'         and l3.__$seqval = x.__$seqval_max'
                + s.crlf + N'         and l3.__$operation = 2'
                + s.crlf + N'       )'
                + s.crlf + N'      and not exists ('
                + s.crlf + N'       -- If there is only 1 sequence for row ''l'', we don''t need to check for previous'
                + s.crlf + N'       -- actions (hence the not x.__$seqval_max = x.__$seqval_min). But if multiple'
                + s.crlf + N'       -- sequences do exist, we need to exclude those deletes for rows that were'
                + s.crlf + N'       -- first inserted within this same range (__$seqval_min). Net result should'
                + s.crlf + N'       -- look as if the row never existed at all. But again, we need to account for'
                + s.crlf + N'       -- defered updates here too: if both __$operation = 1 and __$operation = 2'
                + s.crlf + N'       -- exists the first operation is a defered update instead of an insert and'
                + s.crlf + N'       -- we should include the delete(1) operation l.'
                + s.crlf + N'       select *'
                + s.crlf + N'       from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l4'
                + s.crlf + N'       where not x.__$seqval_max = x.__$seqval_min'

        + (
         select
                + s.crlf + N'        and l4.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
         from cdc.index_columns ic
         where ic.object_id = ct.object_id
         order by ic.index_ordinal
         for xml path(''), type
                 ).value('.','nvarchar(max)')

                + s.crlf + N'        and l4.__$seqval = x.__$seqval_min'
                + s.crlf + N'        and l4.__$operation in (1, 2)'
                + s.crlf + N'       having min(l4.__$operation) = 2'
                + s.crlf + N'      )'
                + s.crlf + N'     )'
                + s.crlf + N'    )' as [definition]

            into #definitions

            from (
                 select char(0x0d) + char(0x0a)
             ) s (crlf)
             cross join cdc.change_tables ct
       where ct.supports_net_changes = 1;

       if object_id('tempdb..#todo') is not null
           drop table #todo;

       raiserror('Comparing to existing functions...', 0, 0) with nowait;

            select
             d.functionName,
             case when object_id(d.functionName) is null
                 then 'Missing'
                 else
                     case when nullif(checksum(x.txtNew),checksum(x.txtOld)) is not null
                        then 'Different'
                        else 'OK'
                     end
             end as [Status],
             d.definition
            into #todo
            from #definitions d
             cross apply (
                     select
                        replace(
                        replace(
                        replace(
                        replace(
                        replace(
                        replace(
                         d.definition,
                         char(0x0d), ' '),
                         char(0x0a), ' '),
                         char(9), ' '),
                         ' ', ' ' + char(9)),
                         char(9) + ' ', ''),
                         char(9), '') as txtNew,

                        replace(
                        replace(
                        replace(
                        replace(
                        replace(
                        replace(
                         (select m.definition from sys.sql_modules m where object_id = object_id(d.functionName)),
                         char(0x0d), ' '),
                         char(0x0a), ' '),
                         char(9), ' '),
                         ' ', ' ' + char(9)),
                         char(9) + ' ', ''),
                         char(9), '') as txtOld
                 ) x;

            declare cur cursor local fast_forward
            for
            select
             t.functionName,
             case t.Status
                 when 'Different' then replace(t.definition, 'CREATE FUNCTION', 'ALTER FUNCTION')
                 when 'Missing' then t.definition
             end
            from #todo t
            where nullif('OK', t.Status) is not null
            order by t.functionName;

            open cur;

            while 1 = 1
            begin
             declare @functionName sysname;
             declare @stmt nvarchar(max);

             fetch next from cur into @functionName, @stmt;
             if @@fetch_status = -1
                 break;

             if @@fetch_status = 0
             begin
                 raiserror('Processing %s...', 0, 0, @FunctionName) with nowait;

                 exec sp_executesql @stmt;
             end
            end

            close cur;
            deallocate cur;

            raiserror('Done.', 0, 0) with nowait;
    end

    exec [dbo].[spGenerateCustomCDCnetFunctions]



    Posting Data Etiquette - Jeff Moden[/url]
    Posting Performance Based Questions - Gail Shaw[/url]
    Hidden RBAR - Jeff Moden[/url]
    Cross Tabs and Pivots - Jeff Moden[/url]
    Catch-all queries - Gail Shaw[/url]


    If you don't have time to do it right, when will you have time to do it over?

Viewing 12 posts - 1 through 12 (of 12 total)

You must be logged in to reply to this topic. Login to reply