Home Forums SQL Server 2012 SQL Server 2012 - T-SQL How can I force a deferred update to create two rows in change data capture? RE: How can I force a deferred update to create two rows in change data capture?

  • The extra where clause that MS added to fix the deferred update issue does functionally fix the issue on small(ish) numbers of rows in the _CT tables. When however I try to run it on a table with 500.000 rows in the _CT table, the merge query that used to run in +- 30 seconds, doesn't finish after having run for at least a day and a half... 🙁

    I've created a script that generates my own version of the net changes functions as cdc.fn_get_net_changes_JFH_<capture instance>(). (JFH is my employer). I'm not completely done testing it functionally, and the generated functions only support 'all with merge', but this version both seems to fix the deferred-update issue and performs well, even with larger change sets. I've kept the functions compatible with MS' net_changes functions, so a global search and replace of the function names should be sufficient to switch from using one or the other in your sources. You're welcome to test it and maybe give comments?

    declare cur cursor local forward_only

    for

    select

    N'create function [cdc].' + quotename('fn_cdc_get_net_changes_JFH_' + ct.capture_instance )

    + crlf + N'(@from_lsn binary(10),'

    + crlf + N'@to_lsn binary(10),'

    + crlf + N'@row_filter_option nvarchar(30)'

    + crlf + N')'

    + crlf + N'returns table'

    + crlf + N'return'

    + crlf + N''

    + crlf + N' with cte as ('

    + crlf + N' select'

    + crlf + N' t1.[__$start_lsn],'

    + crlf + N' t1.[__$end_lsn],'

    + crlf + N' t1.[__$seqval],'

    + crlf + N' x.[__$operation]'

    + (

    select N','

    + crlf + N' t1.' + quotename(col.column_name) as [text()]

    from cdc.captured_columns col

    where col.object_id = ct.object_id

    -- and col.is_computed = 0

    order by col.column_ordinal

    for xml path(''), type

    ).value('.','nvarchar(max)')

    + crlf + N' from [cdc].' + quotename(ct.capture_instance + '_CT') + ' t1 with (nolock)'

    + crlf + N' outer apply ('

    + crlf + N' select'

    + crlf + N' case'

    + crlf + N' when exists ('

    + crlf + N' select *'

    + crlf + N' from [cdc].' + quotename(ct.capture_instance + '_CT') + ' t2 with (nolock)'

    + crlf + N' where t2.[__$start_lsn] = t1.[__$start_lsn]'

    + crlf + N' and t2.[__$seqval] = t1.[__$seqval]'

    + crlf + N' and t2.[__$operation] = case t1.[__$operation] when 1 then 2 when 2 then 1 end'

    + (

    select

    crlf + N' and t2.' + quotename(ic.column_name) + ' = t1.' + quotename(ic.column_name) as [text()]

    from cdc.index_columns ic

    where ic.object_id = ct.object_id

    order by ic.index_ordinal

    for xml path(''), type

    ).value('.','nvarchar(max)')

    + crlf + N' )'

    + crlf + N' then'

    + crlf + N' case t1.[__$operation]'

    + crlf + N' when 1 then 3'

    + crlf + N' when 2 then 4'

    + crlf + N' end'

    + crlf + N' else'

    + crlf + N' t1.[__$operation]'

    + crlf + N' end as [__$operation]'

    + crlf + N' where (t1.[__$operation] = 1 or t1.[__$operation] = 2)'

    + crlf + N''

    + crlf + N' union all'

    + crlf + N''

    + crlf + N' select t1.[__$operation]'

    + crlf + N' where (t1.[__$operation] = 3 or t1.[__$operation] = 4)'

    + crlf + N' ) x'

    + crlf + N' )'

    + crlf + N''

    + crlf + N'select'

    + crlf + N' NULL as __$start_lsn,'

    + crlf + N' NULL as __$operation,'

    + crlf + N' NULL as __$update_mask'

    + (

    select N','

    + crlf + N' NULL as ' + quotename(col.column_name) as [text()]

    from cdc.captured_columns col

    where col.object_id = ct.object_id

    -- and col.is_computed = 0

    order by col.column_ordinal

    for xml path(''), type

    ).value('.','nvarchar(max)')

    + crlf + N'where ( [sys].[fn_cdc_check_parameters]( N' + quotename(ct.capture_instance, '''') + ', @from_lsn, @to_lsn, lower(rtrim(ltrim(@row_filter_option))), 1) = 0)'

    + crlf + N''

    + crlf + N'union all'

    + crlf + N''

    + crlf + N' select'

    + crlf + N' tbl.__$start_lsn,'

    + crlf + N' case tbl.__$operation'

    + crlf + N' when 1 then 1'

    + crlf + N' else 5'

    + crlf + N' end as __$operation,'

    + crlf + N' null as __$update_mask'

    + (

    select N','

    + crlf + N' tbl.' + quotename(col.column_name) as [text()]

    from cdc.captured_columns col

    where col.object_id = ct.object_id

    -- and col.is_computed = 0

    order by col.column_ordinal

    for xml path(''), type

    ).value('.','nvarchar(max)')

    + crlf + N' from cte tbl with (nolock)'

    + crlf + N' inner join ('

    + crlf + N' select'

    + (

    select

    crlf + N' ct.' + quotename(ic.column_name) + ',' as [text()]

    from cdc.index_columns ic

    where ic.object_id = ct.object_id

    order by ic.index_ordinal

    for xml path(''), type

    ).value('.','nvarchar(max)')

    + crlf + N' min( case ct.__$operation when 2 then ct.__$seqval end) as insert_min_seqval,'

    + crlf + N' max( ct.__$seqval) as max_seqval,'

    + crlf + N' min( ct.__$seqval) as min_seqval'

    + crlf + N' from cte ct with (nolock)'

    + crlf + N' where lower(rtrim(ltrim(@row_filter_option))) = N''all with merge'''

    + crlf + N' and ( [sys].[fn_cdc_check_parameters]( N' + quotename(ct.capture_instance, '''') + ', @from_lsn, @to_lsn, lower(rtrim(ltrim(@row_filter_option))), 1) = 1)'

    + crlf + N' and ct.__$start_lsn >= @from_lsn'

    + crlf + N' and ct.__$start_lsn <= @to_lsn'

    + crlf + N' and (ct.__$operation = 1 or ct.__$operation = 2 or ct.__$operation = 4)'

    + crlf + N' group by'

    + stuff((

    select N','

    + crlf + N' ct.' + quotename(ic.column_name) as [text()]

    from cdc.index_columns ic

    where ic.object_id = ct.object_id

    order by ic.index_ordinal

    for xml path(''), type

    ).value('.','nvarchar(max)'), 1, 1, '')

    + crlf + N' ) t on ('

    + crlf + N' '

    + stuff((

    select

    crlf + N' and t.' + quotename(ic.column_name) + N' = tbl.' + quotename(ic.column_name) as [text()]

    from cdc.index_columns ic

    where ic.object_id = ct.object_id

    order by ic.index_ordinal

    for xml path(''), type

    ).value('.','nvarchar(max)'), 1, 15, '')

    + crlf + N' and t.max_seqval = tbl.__$seqval'

    + crlf + N' and tbl.__$start_lsn >= @from_lsn'

    + crlf + N' and tbl.__$start_lsn <= @to_lsn'

    + crlf + N' and (tbl.__$operation = 2 -- Insert,'

    + crlf + N' or tbl.__$operation = 4 -- Update or'

    + crlf + N' or (tbl.__$operation = 1 -- Delete ...'

    + crlf + N' and (t.insert_min_seqval is null or t.insert_min_seqval <> t.min_seqval) -- ... and existed before this batch already.'

    + crlf + N' )))'

    from (

    select char(0x0d) + char(0x0a)

    ) s (crlf)

    cross join cdc.change_tables ct;

    open cur;

    while 1 = 1

    begin

    declare @stmt nvarchar(max);

    fetch next from cur into @stmt;

    if @@fetch_status = -1

    break;

    if @@fetch_status = 0

    begin

    exec sp_executesql @stmt;

    end

    end

    close cur;

    deallocate cur;

    edit: Changed code to use cdc.captured_columns and cdc.index_columns instead of sys.columns and sys.indexes & sys.index_columns.



    Posting Data Etiquette - Jeff Moden[/url]
    Posting Performance Based Questions - Gail Shaw[/url]
    Hidden RBAR - Jeff Moden[/url]
    Cross Tabs and Pivots - Jeff Moden[/url]
    Catch-all queries - Gail Shaw[/url]


    If you don't have time to do it right, when will you have time to do it over?