January 15, 2011 at 1:23 pm
Comments posted to this topic are about the item Incremental Data Loading Using CDC
January 17, 2011 at 3:40 am
Hi Mark,
Thanks for writing this article it was interesting to see a different way of implementing CDC incremental loading. I have also implemented an incremental load using CDC and I found that the really important part is ensuring synchronisation between the data warehouse tables and the tracked tables. You mention a couple of methods for resynchronisation in your article and I have an additional method which works quite reliably for me which I thought you might be interested in.
In my solution I have a SQL replication step between the production OLTP and the data warehouse. This gives me control over the data flow into the monitored tables and allows me to pause replication apply changes to the data warehouse. Once finished I can then restart the data flow and CDC monitoring without losing any of the change data. This of course requires the replication subscriptions to be setup not to expire during the stoppage.
Cheers
Martin
January 17, 2011 at 5:49 am
Nice article, thanks for sharing.
Need an answer? No, you need a question
My blog at https://sqlkover.com.
MCSE Business Intelligence - Microsoft Data Platform MVP
January 17, 2011 at 5:54 am
got knowledge of incremental data loading thanks for the post
January 17, 2011 at 7:29 am
Mark:
Great article. Nice clarification and demonstration of CDC.
It has often been a problem explaining to programmers the
intent of CDC and to execute the project correctly.
One note, there are no footnotes to your previsous two article.
For completeness in discussing CDC, can you either post or send
them to me.
Thanks!
January 17, 2011 at 10:56 am
January 17, 2011 at 11:52 am
thanks, Mark!
March 5, 2012 at 1:42 pm
I read the article about Incremental Data Loading using CDC - but can you create an article and demo step by step on how to capture changes from a bulk load and incremental load from a source that does not have CDC (like SQL Server 2005). most of my tables don't have datetime column so I don't know if the record is new or updated by using datetime alone. Can you help me?
I need to do a bulk load initially from 3 servers merged into one server database and perform incremental updates realtime or each night.
June 7, 2013 at 2:54 am
Still a good article, but would be even better if you addressed some of the potential pitfalls of it.
Like runaway transaction log growth if you stop being able to clean out the system properly.
June 12, 2013 at 11:54 am
Hello,
Great article, I would like to know more about how to handle disruptions:
- Server crash
- SQL Server instance restart
- CDC package fails half way
- CDC Package fails due to time out or network error
- CDC Package fails due to transaction log full
- Source table DDL Changes like add a column and populate it
Should I just rerun the CDC in Reprocessing mode?
In my scenario I track a table which is updated once a day.
BI Guy
June 12, 2013 at 8:23 pm
I did some testing.
added the reprocessing column option to the cdc source.
Performed a couple changes in DDL and DML over the tracked table.
Ran the CDC update package which failed. The time out was 520 for the test.
The incremental CDC package would now fail with 1 warning about reprocessing mode and a time out error.
Truncating the target table and running a 10 min initial load was the only way I found to get the incremental update to work again.
BI Guy
June 20, 2017 at 3:03 am
I've run cdc for incremental loading in several projects and I've found that it a) has some bugs, plus b) it has performance issues with larger datasets. In a MS Connect item I've reported these issues to Microsoft, including a procedure generating alternative functions for the fn_cdc_get_net_changes_<capture_instance>() functions that do return correct results to illustrate the problems. The attachment does however not (yet) show in Connect, so I share it here too hoping you can all vote-up my connect item for MS to seriously consider fixing the log reader....
Below procedure is the attachment I've sent to MS. It generates alternative fn_cdc_get_net_changes_...() functions that will not return superfluous rows with __$operation = 1 generated from deferred updates. The generated functions are highly optimised to be as fast as, if no faster than, MS' "fixed" version. However if your data has a lot of 'deleted' rows (__$operation equals 1) the fix requires an additional scan on the _CT table MS' version does not require, so in these situations the fix may cost you a little performance. The better solution would be MS fixing the log reader to never put defered updates as pairs of 1 & 2 into the _CT tables.... In which case the fn_cdc_get_net_changes functions can get really blazing fast, even for large data sets!CREATE PROCEDURE [dbo].[spGenerateCustomCDCnetFunctions]
as
begin
set nocount on;
if object_id('tempdb..#definitions') is not null
drop table #definitions;
raiserror('Generating function bodies...', 0, 0) with nowait;
select
ct.object_id,
'[cdc].' + quotename('fn_rpr_get_net_changes_' + ct.capture_instance ) as functionName,
N'CREATE FUNCTION [cdc].' + quotename('fn_rpr_get_net_changes_' + ct.capture_instance)
+ s.crlf + N'( @from_lsn binary(10),'
+ s.crlf + N' @to_lsn binary(10),'
+ s.crlf + N' @row_filter_option nvarchar(30)'
+ s.crlf + N')'
+ s.crlf + N'returns table'
+ s.crlf + N'return'
+ s.crlf + N''
+ s.crlf + N' select'
+ s.crlf + N' NULL as __$start_lsn,'
+ s.crlf + N' NULL as __$operation,'
+ s.crlf + N' NULL as __$update_mask'
+ (
select N','
+ s.crlf + N' NULL as ' + quotename(col.column_name) as [text()]
from cdc.captured_columns col
where col.object_id = ct.object_id
order by col.column_ordinal
for xml path(''), type
).value('.','nvarchar(max)')
+ s.crlf + N' where ( [sys].[fn_cdc_check_parameters]( N' + quotename(ct.capture_instance, '''') + ', @from_lsn, @to_lsn, lower(rtrim(ltrim(@row_filter_option))), 1) = 0)'
+ s.crlf + N''
+ s.crlf + N' union all'
+ s.crlf + N''
+ s.crlf + N''
+ s.crlf + N' select'
+ s.crlf + N' l.[__$start_lsn],'
+ s.crlf + N' case'
+ s.crlf + N' when l.__$operation = 1 then 1'
+ s.crlf + N' when l.__$operation = 2 then 5'
+ s.crlf + N' when l.__$operation = 4 then 5'
+ s.crlf + N' end as __$operation,'
+ s.crlf + N' NULL as [__$update_mask]'
+ (
select N','
+ s.crlf + N' l.' + quotename(col.column_name) as [text()]
from cdc.captured_columns col
where col.object_id = ct.object_id
order by col.column_ordinal
for xml path(''), type
).value('.','nvarchar(max)')
+ s.crlf + N' from ('
+ s.crlf + N' select top (1) NULL as c'
+ s.crlf + N' from ('
+ s.crlf + N' select'
+ s.crlf + N' lower(rtrim(ltrim(@row_filter_option))),'
+ s.crlf + N' N' + quotename( ct.capture_instance, '''')
+ s.crlf + N' ) p ([filter_option], [capture_instance])'
+ s.crlf + N' where p.[filter_option] = N''all with merge'''
+ s.crlf + N' and [sys].[fn_cdc_check_parameters]( p.[capture_instance], @from_lsn, @to_lsn, p.filter_option, 1) = 1'
+ s.crlf + N' ) c'
+ s.crlf + N' cross join ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l -- Last operation per key value.'
+ s.crlf + N' inner join ('
+ s.crlf + N' select'
+ (
select
s.crlf + N' m.' + quotename(ic.column_name) + ',' as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)')
+ s.crlf + N' max(m.__$seqval) as __$seqval_max, -- Sequence value for Last operation.'
+ s.crlf + N' min(m.__$seqval) as __$seqval_min -- Sequence value for First operation.'
+ s.crlf + N' from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' m'
+ s.crlf + N' where m.__$start_lsn >= @from_lsn'
+ s.crlf + N' and m.__$start_lsn <= @to_lsn'
+ s.crlf + N' and m.__$operation in (1,2,4)'
+ s.crlf + N' group by'
+ stuff((
select
N','
+ s.crlf + N' m.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)'), 1, 1, '')
+ s.crlf + N' ) x on ('
+ stuff((
select N' and x.' + quotename(ic.column_name) + ' = l.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)'), 1, 5, '') + N' and x.__$seqval_max = l.__$seqval)'
+ s.crlf + N' where l.__$start_lsn >= @from_lsn'
+ s.crlf + N' and l.__$start_lsn <= @to_lsn'
+ s.crlf + N' and (l.__$operation = 4 -- New values for updates.'
+ s.crlf + N' or l.__$operation = 2 -- Inserted values or New values for defered updates.'
+ s.crlf + N' or (l.__$operation = 1 -- Old values for defered updates or deletes.'
+ s.crlf + N' and not exists ('
+ s.crlf + N' -- Suppress all deletes that are the old values part of a defered update.'
+ s.crlf + N' -- We only use the other part of the defered update, the row with __$operation = 2.'
+ s.crlf + N' select *'
+ s.crlf + N' from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l3'
+ s.crlf + N' where'
+ stuff((
select
+ s.crlf + N' and l3.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)'), 1, 32, '')
+ s.crlf + N' and l3.__$seqval = x.__$seqval_max'
+ s.crlf + N' and l3.__$operation = 2'
+ s.crlf + N' )'
+ s.crlf + N' and not exists ('
+ s.crlf + N' -- If there is only 1 sequence for row ''l'', we don''t need to check for previous'
+ s.crlf + N' -- actions (hence the not x.__$seqval_max = x.__$seqval_min). But if multiple'
+ s.crlf + N' -- sequences do exist, we need to suppress those deletes for rows that were'
+ s.crlf + N' -- first inserted within this same range (__$seqval_min). Net result should'
+ s.crlf + N' -- look as if the row never existed at all. But again, we need to exclude'
+ s.crlf + N' -- deferred updates here too: if both __$operation = 1 and __$operation = 2'
+ s.crlf + N' -- exists it is a defered update instead of an insert.'
+ s.crlf + N' select *'
+ s.crlf + N' from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l4'
+ s.crlf + N' where not x.__$seqval_max = x.__$seqval_min'
+ (
select
+ s.crlf + N' and l4.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)')
+ s.crlf + N' and l4.__$seqval = x.__$seqval_min'
+ s.crlf + N' and l4.__$operation in (1, 2)'
+ s.crlf + N' having min(l4.__$operation) = 2'
+ s.crlf + N' )'
+ s.crlf + N' )'
+ s.crlf + N' )'
+ s.crlf + N''
+ s.crlf + N' union all'
+ s.crlf + N''
+ s.crlf + N' select'
+ s.crlf + N' l.[__$start_lsn],'
+ s.crlf + N' case'
+ s.crlf + N' when l.__$operation = 1 -- We have already excluded all'
+ s.crlf + N' then 1 -- deletes of rows that did not'
+ s.crlf + N' -- exist before the batch started'
+ s.crlf + N' -- in the where conditions. So any'
+ s.crlf + N' -- delete(1) __$operation here must'
+ s.crlf + N' -- be a net delete(1) of a row that'
+ s.crlf + N' -- existed before the batch.'
+ s.crlf + N' when exists ('
+ s.crlf + N' -- If the l.__$operation is not a delete(1),'
+ s.crlf + N' -- the net __$operation depends on what the'
+ s.crlf + N' -- first __$operation in this batch is. If'
+ s.crlf + N' -- the first __$operation is an insert(2), and'
+ s.crlf + N' -- not one that is part of a defered update,'
+ s.crlf + N' -- then the net __$operation is an insert(2).'
+ s.crlf + N' -- Otherwise the row existed before this batch'
+ s.crlf + N' -- started and thus the net __$operation is an'
+ s.crlf + N' -- update(4). For a defered update both an'
+ s.crlf + N' -- insert(2) and a delete(1) __$operation'
+ s.crlf + N' -- exists, for an insert only an insert(2)'
+ s.crlf + N' -- __$operation exists. So I''m looking for'
+ s.crlf + N' -- a single __$operation 2 on the first operation.'
+ s.crlf + N' -- I can''t optimize for the single operation'
+ s.crlf + N' -- situation here, because the last operation'
+ s.crlf + N' -- may be a defered update, giving'
+ s.crlf + N' -- l.__$operation = 2 whereas the net result'
+ s.crlf + N' -- should be update(4).'
+ s.crlf + N' select *'
+ s.crlf + N' from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l4'
+ s.crlf + N' where'
+ stuff((
select
+ s.crlf + N' and l4.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)'), 1, 26, '')
+ s.crlf + N' and l4.__$seqval = x.__$seqval_min'
+ s.crlf + N' and l4.__$operation in (1, 2)'
+ s.crlf + N' having min(l4.__$operation) = 2'
+ s.crlf + N' )'
+ s.crlf + N' then 2'
+ s.crlf + N' else 4'
+ s.crlf + N' end as __$operation,'
+ s.crlf + N' NULL as [__$update_mask]'
+ (
select N','
+ s.crlf + N' l.' + quotename(col.column_name) as [text()]
from cdc.captured_columns col
where col.object_id = ct.object_id
order by col.column_ordinal
for xml path(''), type
).value('.','nvarchar(max)')
+ s.crlf + N' from ('
+ s.crlf + N' select top (1) NULL as c'
+ s.crlf + N' from ('
+ s.crlf + N' select'
+ s.crlf + N' lower(rtrim(ltrim(@row_filter_option))),'
+ s.crlf + N' N' + quotename( ct.capture_instance, '''')
+ s.crlf + N' ) p ([filter_option], [capture_instance])'
+ s.crlf + N' where p.[filter_option] = N''all'''
+ s.crlf + N' and [sys].[fn_cdc_check_parameters]( p.[capture_instance], @from_lsn, @to_lsn, p.filter_option, 1) = 1'
+ s.crlf + N' ) c'
+ s.crlf + N' cross join ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l -- Last operation per key value.'
+ s.crlf + N' inner join ('
+ s.crlf + N' select'
+ (
select
+ s.crlf + N' m.' + quotename(ic.column_name) + ',' as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)')
+ s.crlf + N' max(m.__$seqval) as __$seqval_max, -- Sequence value for Last operation.'
+ s.crlf + N' min(m.__$seqval) as __$seqval_min -- Sequence value for First operation.'
+ s.crlf + N' from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' m'
+ s.crlf + N' where m.__$start_lsn >= @from_lsn'
+ s.crlf + N' and m.__$start_lsn <= @to_lsn'
+ s.crlf + N' and m.__$operation in (1,2,4)'
+ s.crlf + N' group by'
+ stuff((
select
N','
+ s.crlf + N' m.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)'), 1, 1, '')
+ s.crlf + N' ) x on ('
+ stuff((
select N' and x.' + quotename(ic.column_name) + ' = l.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)'), 1, 5, '') + N' and x.__$seqval_max = l.__$seqval)'
+ s.crlf + N' where l.__$start_lsn >= @from_lsn'
+ s.crlf + N' and l.__$start_lsn <= @to_lsn'
+ s.crlf + N' and (l.__$operation = 4 -- New values for updates.'
+ s.crlf + N' or l.__$operation = 2 -- Inserted values or New values for defered updates.'
+ s.crlf + N' or (l.__$operation = 1 -- Deleted values or Old values for defered updates.'
+ s.crlf + N' and not exists ('
+ s.crlf + N' -- Exclude all deletes that are the old values part of a defered update.'
+ s.crlf + N' -- We will only use the other part of the defered update, the row with'
+ s.crlf + N' -- l.__$operation = 2.'
+ s.crlf + N' select *'
+ s.crlf + N' from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l3'
+ s.crlf + N' where'
+ stuff((
select
+ s.crlf + N' and l3.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)'), 1, 32, '')
+ s.crlf + N' and l3.__$seqval = x.__$seqval_max'
+ s.crlf + N' and l3.__$operation = 2'
+ s.crlf + N' )'
+ s.crlf + N' and not exists ('
+ s.crlf + N' -- If there is only 1 operation for row ''l'', we don''t need to check for previous'
+ s.crlf + N' -- actions (hence the not x.__$seqval_max = x.__$seqval_min). But if multiple'
+ s.crlf + N' -- sequences do exist, we need to exclude those deletes for rows that were'
+ s.crlf + N' -- first inserted within this same range (__$seqval_min). Net result should'
+ s.crlf + N' -- look as if the row never existed at all. But again, we need to account for'
+ s.crlf + N' -- defered updates here too: if both __$operation = 1 and __$operation = 2'
+ s.crlf + N' -- exists the first operation is a defered update instead of an insert and'
+ s.crlf + N' -- we should include the delete(1) operation l.'
+ s.crlf + N' select *'
+ s.crlf + N' from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l4'
+ s.crlf + N' where not x.__$seqval_max = x.__$seqval_min'
+ (
select
+ s.crlf + N' and l4.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)')
+ s.crlf + N' and l4.__$seqval = x.__$seqval_min'
+ s.crlf + N' and l4.__$operation in (1, 2)'
+ s.crlf + N' having min(l4.__$operation) = 2'
+ s.crlf + N' )'
+ s.crlf + N' )'
+ s.crlf + N' )'
+ s.crlf + N''
+ s.crlf + N' union all'
+ s.crlf + N''
+ s.crlf + N' select'
+ s.crlf + N' l.[__$start_lsn],'
+ s.crlf + N' case'
+ s.crlf + N' when l.__$operation = 1'
+ s.crlf + N' then 1'
+ s.crlf + N' when exists ('
+ s.crlf + N' select *'
+ s.crlf + N' from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l4'
+ s.crlf + N' where'
+ stuff((
select
+ s.crlf + N' and l4.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)'), 1, 26, '')
+ s.crlf + N' and l4.__$seqval = x.__$seqval_min'
+ s.crlf + N' and l4.__$operation in (1, 2)'
+ s.crlf + N' having min(l4.__$operation) = 2'
+ s.crlf + N' )'
+ s.crlf + N' then 2'
+ s.crlf + N' else 4'
+ s.crlf + N' end as __$operation,'
+ s.crlf + N' x.__$update_mask'
+ (
select N','
+ s.crlf + N' l.' + quotename(col.column_name) as [text()]
from cdc.captured_columns col
where col.object_id = ct.object_id
order by col.column_ordinal
for xml path(''), type
).value('.','nvarchar(max)')
+ s.crlf + N' from ('
+ s.crlf + N' select top (1) NULL as c'
+ s.crlf + N' from ('
+ s.crlf + N' select'
+ s.crlf + N' lower(rtrim(ltrim(@row_filter_option))),'
+ s.crlf + N' N' + quotename( ct.capture_instance, '''')
+ s.crlf + N' ) p ([filter_option], [capture_instance])'
+ s.crlf + N' where p.[filter_option] = N''all with mask'''
+ s.crlf + N' and [sys].[fn_cdc_check_parameters]( p.[capture_instance], @from_lsn, @to_lsn, p.filter_option, 1) = 1'
+ s.crlf + N' ) c'
+ s.crlf + N' cross join ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l -- Last operation per key value.'
+ s.crlf + N' inner join ('
+ s.crlf + N' select'
+ (
select
+ s.crlf + N' m.' + quotename(ic.column_name) + ',' as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)')
+ s.crlf + N' max(m.__$seqval) as __$seqval_max, -- Sequence value for Last operation.'
+ s.crlf + N' min(m.__$seqval) as __$seqval_min, -- Sequence value for First operation.'
+ s.crlf + N' [sys].[ORMask](m.__$update_mask) as __$update_mask -- bitwise OR of all update_masks.'
+ s.crlf + N' from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' m'
+ s.crlf + N' where m.__$start_lsn >= @from_lsn'
+ s.crlf + N' and m.__$start_lsn <= @to_lsn'
+ s.crlf + N' and m.__$operation in (1,2,4)'
+ s.crlf + N' group by'
+ stuff((
select
N','
+ s.crlf + N' m.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)'), 1, 1, '')
+ s.crlf + N' ) x on ('
+ stuff((
select N' and x.' + quotename(ic.column_name) + ' = l.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)'), 1, 5, '') + N' and x.__$seqval_max = l.__$seqval)'
+ s.crlf + N' where l.__$start_lsn >= @from_lsn'
+ s.crlf + N' and l.__$start_lsn <= @to_lsn'
+ s.crlf + N' and (l.__$operation = 4 -- New values for updates.'
+ s.crlf + N' or l.__$operation = 2 -- Inserted values or New values for defered updates.'
+ s.crlf + N' or (l.__$operation = 1 -- Old values for defered updates or deletes.'
+ s.crlf + N' and not exists ('
+ s.crlf + N' -- Exclude all deletes that are the old values part of a defered update.'
+ s.crlf + N' -- We only use the other part of the defered update, the row with __$operation = 2.'
+ s.crlf + N' select *'
+ s.crlf + N' from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l3'
+ s.crlf + N' where'
+ stuff((
select
+ s.crlf + N' and l3.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)'), 1, 32, '')
+ s.crlf + N' and l3.__$seqval = x.__$seqval_max'
+ s.crlf + N' and l3.__$operation = 2'
+ s.crlf + N' )'
+ s.crlf + N' and not exists ('
+ s.crlf + N' -- If there is only 1 sequence for row ''l'', we don''t need to check for previous'
+ s.crlf + N' -- actions (hence the not x.__$seqval_max = x.__$seqval_min). But if multiple'
+ s.crlf + N' -- sequences do exist, we need to exclude those deletes for rows that were'
+ s.crlf + N' -- first inserted within this same range (__$seqval_min). Net result should'
+ s.crlf + N' -- look as if the row never existed at all. But again, we need to account for'
+ s.crlf + N' -- defered updates here too: if both __$operation = 1 and __$operation = 2'
+ s.crlf + N' -- exists the first operation is a defered update instead of an insert and'
+ s.crlf + N' -- we should include the delete(1) operation l.'
+ s.crlf + N' select *'
+ s.crlf + N' from ' + quotename(object_schema_name(ct.object_id)) + '.' + quotename(object_name(ct.object_id)) + ' l4'
+ s.crlf + N' where not x.__$seqval_max = x.__$seqval_min'
+ (
select
+ s.crlf + N' and l4.' + quotename(ic.column_name) + ' = x.' + quotename(ic.column_name) as [text()]
from cdc.index_columns ic
where ic.object_id = ct.object_id
order by ic.index_ordinal
for xml path(''), type
).value('.','nvarchar(max)')
+ s.crlf + N' and l4.__$seqval = x.__$seqval_min'
+ s.crlf + N' and l4.__$operation in (1, 2)'
+ s.crlf + N' having min(l4.__$operation) = 2'
+ s.crlf + N' )'
+ s.crlf + N' )'
+ s.crlf + N' )' as [definition]
into #definitions
from (
select char(0x0d) + char(0x0a)
) s (crlf)
cross join cdc.change_tables ct
where ct.supports_net_changes = 1;
if object_id('tempdb..#todo') is not null
drop table #todo;
raiserror('Comparing to existing functions...', 0, 0) with nowait;
select
d.functionName,
case when object_id(d.functionName) is null
then 'Missing'
else
case when nullif(checksum(x.txtNew),checksum(x.txtOld)) is not null
then 'Different'
else 'OK'
end
end as [Status],
d.definition
into #todo
from #definitions d
cross apply (
select
replace(
replace(
replace(
replace(
replace(
replace(
d.definition,
char(0x0d), ' '),
char(0x0a), ' '),
char(9), ' '),
' ', ' ' + char(9)),
char(9) + ' ', ''),
char(9), '') as txtNew,
replace(
replace(
replace(
replace(
replace(
replace(
(select m.definition from sys.sql_modules m where object_id = object_id(d.functionName)),
char(0x0d), ' '),
char(0x0a), ' '),
char(9), ' '),
' ', ' ' + char(9)),
char(9) + ' ', ''),
char(9), '') as txtOld
) x;
declare cur cursor local fast_forward
for
select
t.functionName,
case t.Status
when 'Different' then replace(t.definition, 'CREATE FUNCTION', 'ALTER FUNCTION')
when 'Missing' then t.definition
end
from #todo t
where nullif('OK', t.Status) is not null
order by t.functionName;
open cur;
while 1 = 1
begin
declare @functionName sysname;
declare @stmt nvarchar(max);
fetch next from cur into @functionName, @stmt;
if @@fetch_status = -1
break;
if @@fetch_status = 0
begin
raiserror('Processing %s...', 0, 0, @FunctionName) with nowait;
exec sp_executesql @stmt;
end
end
close cur;
deallocate cur;
raiserror('Done.', 0, 0) with nowait;
end
exec [dbo].[spGenerateCustomCDCnetFunctions]
Viewing 12 posts - 1 through 11 (of 11 total)
You must be logged in to reply to this topic. Login to reply