The extra where clause that MS added to fix the deferred update issue does functionally fix the issue on small(ish) numbers of rows in the _CT tables. When however I try to run it on a table with 500.000 rows in the _CT table, the merge query that used to run in +- 30 seconds, doesn't finish after having run for at least a day and a half... 🙁
I've created a script that generates my own version of the net changes functions as cdc.fn_get_net_changes_JFH_<capture instance>(). (JFH is my employer). I'm not completely done testing it functionally, and the generated functions only support 'all with merge', but this version both seems to fix the deferred-update issue and performs well, even with larger change sets. I've kept the functions compatible with MS' net_changes functions, so a global search and replace of the function names should be sufficient to switch from using one or the other in your sources. You're welcome to test it and maybe give comments?
declare cur cursor local forward_only
for
select
N'create function [cdc].' + quotename('fn_cdc_get_net_changes_JFH_' + ct.capture_instance )
+ crlf + N'(@from_lsn binary(10),'
+ crlf + N'@to_lsn binary(10),'
+ crlf + N'@row_filter_option nvarchar(30)'
+ crlf + N')'
+ crlf + N'returns table'
+ crlf + N'return'
+ crlf + N''
+ crlf + N' with cte as ('
+ crlf + N' select'
+ crlf + N' t1.[__$start_lsn],'
+ crlf + N' t1.[__$end_lsn],'
+ crlf + N' t1.[__$seqval],'
+ crlf + N' x.[__$operation]'
+ (
select N','
+ crlf + N' t1.' + quotename(col.column_name) as [text()]
from cdc.captured_columns col
where col.object_id = ct.object_id
-- and col.is_computed = 0
order by col.column_ordinal
for xml path(''), type
).value('.','nvarchar(max)')
+ crlf + N' from [cdc].' + quotename(ct.capture_instance + '_CT') + ' t1 with (nolock)'
+ crlf + N' outer apply ('
+ crlf + N' select'
+ crlf + N' case'
+ crlf + N' when exists ('
+ crlf + N' select *'
+ crlf + N' from [cdc].' + quotename(ct.capture_instance + '_CT') + ' t2 with (nolock)'
+ crlf + N' where t2.[__$start_lsn] = t1.[__$start_lsn]'
+ crlf + N' and t2.[__$seqval] = t1.[__$seqval]'
+ crlf + N' and t2.[__$operation] = case t1.[__$operation] when 1 then 2 when 2 then 1 end'
+ (
select
crlf + N' and t2.' + quotename(ic.column_name) + ' = t1.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)')
+ crlf + N' )'
+ crlf + N' then'
+ crlf + N' case t1.[__$operation]'
+ crlf + N' when 1 then 3'
+ crlf + N' when 2 then 4'
+ crlf + N' end'
+ crlf + N' else'
+ crlf + N' t1.[__$operation]'
+ crlf + N' end as [__$operation]'
+ crlf + N' where (t1.[__$operation] = 1 or t1.[__$operation] = 2)'
+ crlf + N''
+ crlf + N' union all'
+ crlf + N''
+ crlf + N' select t1.[__$operation]'
+ crlf + N' where (t1.[__$operation] = 3 or t1.[__$operation] = 4)'
+ crlf + N' ) x'
+ crlf + N' )'
+ crlf + N''
+ crlf + N'select'
+ crlf + N' NULL as __$start_lsn,'
+ crlf + N' NULL as __$operation,'
+ crlf + N' NULL as __$update_mask'
+ (
select N','
+ crlf + N' NULL as ' + quotename(col.column_name) as [text()]
from cdc.captured_columns col
where col.object_id = ct.object_id
-- and col.is_computed = 0
order by col.column_ordinal
for xml path(''), type
).value('.','nvarchar(max)')
+ crlf + N'where ( [sys].[fn_cdc_check_parameters]( N' + quotename(ct.capture_instance, '''') + ', @from_lsn, @to_lsn, lower(rtrim(ltrim(@row_filter_option))), 1) = 0)'
+ crlf + N''
+ crlf + N'union all'
+ crlf + N''
+ crlf + N' select'
+ crlf + N' tbl.__$start_lsn,'
+ crlf + N' case tbl.__$operation'
+ crlf + N' when 1 then 1'
+ crlf + N' else 5'
+ crlf + N' end as __$operation,'
+ crlf + N' null as __$update_mask'
+ (
select N','
+ crlf + N' tbl.' + quotename(col.column_name) as [text()]
from cdc.captured_columns col
where col.object_id = ct.object_id
-- and col.is_computed = 0
order by col.column_ordinal
for xml path(''), type
).value('.','nvarchar(max)')
+ crlf + N' from cte tbl with (nolock)'
+ crlf + N' inner join ('
+ crlf + N' select'
+ (
select
crlf + N' ct.' + quotename(ic.column_name) + ',' as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)')
+ crlf + N' min( case ct.__$operation when 2 then ct.__$seqval end) as insert_min_seqval,'
+ crlf + N' max( ct.__$seqval) as max_seqval,'
+ crlf + N' min( ct.__$seqval) as min_seqval'
+ crlf + N' from cte ct with (nolock)'
+ crlf + N' where lower(rtrim(ltrim(@row_filter_option))) = N''all with merge'''
+ crlf + N' and ( [sys].[fn_cdc_check_parameters]( N' + quotename(ct.capture_instance, '''') + ', @from_lsn, @to_lsn, lower(rtrim(ltrim(@row_filter_option))), 1) = 1)'
+ crlf + N' and ct.__$start_lsn >= @from_lsn'
+ crlf + N' and ct.__$start_lsn <= @to_lsn'
+ crlf + N' and (ct.__$operation = 1 or ct.__$operation = 2 or ct.__$operation = 4)'
+ crlf + N' group by'
+ stuff((
select N','
+ crlf + N' ct.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)'), 1, 1, '')
+ crlf + N' ) t on ('
+ crlf + N' '
+ stuff((
select
crlf + N' and t.' + quotename(ic.column_name) + N' = tbl.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)'), 1, 15, '')
+ crlf + N' and t.max_seqval = tbl.__$seqval'
+ crlf + N' and tbl.__$start_lsn >= @from_lsn'
+ crlf + N' and tbl.__$start_lsn <= @to_lsn'
+ crlf + N' and (tbl.__$operation = 2 -- Insert,'
+ crlf + N' or tbl.__$operation = 4 -- Update or'
+ crlf + N' or (tbl.__$operation = 1 -- Delete ...'
+ crlf + N' and (t.insert_min_seqval is null or t.insert_min_seqval <> t.min_seqval) -- ... and existed before this batch already.'
+ crlf + N' )))'
from (
select char(0x0d) + char(0x0a)
) s (crlf)
cross join cdc.change_tables ct;
open cur;
while 1 = 1
begin
declare @stmt nvarchar(max);
fetch next from cur into @stmt;
if @@fetch_status = -1
break;
if @@fetch_status = 0
begin
exec sp_executesql @stmt;
end
end
close cur;
deallocate cur;
edit: Changed code to use cdc.captured_columns and cdc.index_columns instead of sys.columns and sys.indexes & sys.index_columns.