SQLServerCentral Article

Conditional Set-Based Processing: Moving Towards a Best Practice for ETL

,

At the end of reading this article, you might think that what I'm saying here is just common sense. But the problem is that it is common sense sadly missing in many situations. This article is based on many years of setting up, troubleshooting, and trying to unravel the path of data loads for data marts, archives, and other data consolidations. One of the tasks that still fall primarily to a DBA is to handle the movement of large amounts of data from one data source to another. The data loads may be scheduled daily, ad-hoc, or in the case of a recent project, happening every fifteen minutes. For the purposes of this article, it is immaterial what the data source may be: another database, text files, XML files. The recommendations in this article also do not apply if you are using SSIS packages since conditional processing and error handling is built into the Data Flow Task. If, however, you are regularly using stored procedures and/or managed code to load large amounts of data, you may find this article relevant.

A recent project involved setting up an ETL process that could handle transforming 50,000 to 100,000 records in multiple tables every fifteen minutes from an OLTP system. The initial steps of the ETL included replication to a staging area where SQL Agent jobs running stored procedures could process the staged records. Requirements were to make the ETL process as fault-tolerant or "bullet-proof" as possible so that intervention and troubleshooting required would be minimal. In addition, we should be able to produce user reports on any data that could not load for any reason.

We started by creating stored procedures for each related set of staging tables that could handle the transformations and calculations to produce the loadable version of the data. Anyone who has worked on projects to load data in SQL Server knows that if you try to insert 10,000 records into a table using one SQL statement, you are throwing the dice. If one record in the 10,000-record set causes an error and cannot be loaded and you are not running within an explicit transaction, you can have an interesting time looking for the issue and figuring out which records were loaded before the process failed.

Of course, no one should try this without an explicit transaction, but we have all seen it done. If you load the 10,000 records within an explicit transaction and an error occurs, you can rollback and maintain data consistency. However, you have still not loaded the records and now you need a "data minder,", which would not be consistent with the requirements for minimal intervention.

To meet our requirements for fault tolerance and minimal intervention, we decided to loop through the records in the staging tables and create a Try-Catch around each row to be loaded. In this way, we thought to make sure that errors were handled and recorded and that the record processing would not be halted by a batch error. Records that could not load were written to XML along with any error messages and the loop would proceed without interruption. This produced a very fault-tolerant system with the required error reporting.

Unfortunately, using the RBAR (row-by-agonizing-row) method, it was too slow! We were exceeding the time window we had to get the data loaded. However, if we used set-based processing to speed up data loads and load complete data sets at a time, we would lose fault tolerance.

When we hit on the solution, we were surprised we didn't implement it from the start because it seemed so obvious. To meet all our requirements, we implemented Conditional Set-Based Processing. We added code to load the entire data sets from staging tables, performing transformations and calculations during the insert, but the important part was managing the Try-Catch and the explicit transaction around the set-based data load. In the Catch block of the set-based data load, we put the entire Try-Catch of the RBAR processing code. If an error occurred while loading 50,000 records, the code rolled back all set-based processing of the records and began processing them one at a time. As a result, only if an error was encountered did we revert to the fault-tolerant, row-by-row method. Otherwise, we would use SQL Server's excellent speed for processing data sets and meet our requirements for time to load.

To demonstrate, I have included some code examples below. The first set of code sets up some sample tables -- TargetAddress, SourceAddress, LookupStateProvince, and ETLErrorLog -- and some source data that can be loaded into the SourceAddress table. Some of the source data will violate the foreign key or not null constraints of the TargetAddress table. The ETLErrorLog will receive the error information on any record that cannot be loaded.

/******BEGIN TABLE/DATA SET UP********/

SET NOCOUNT ON
IF EXISTS (SELECT * FROM sys.foreign_keys WHERE object_id = OBJECT_ID(N'[dbo].[FK_TargetAddress_LookupStateProvince_StateProvinceID]') AND parent_object_id = OBJECT_ID(N'[dbo].[TargetAddress]'))
ALTER TABLE [dbo].[TargetAddress] DROP CONSTRAINT [FK_TargetAddress_LookupStateProvince_StateProvinceID]
GO
IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[TargetAddress]') AND type in (N'U'))
DROP TABLE [dbo].[TargetAddress]
GO
CREATE TABLE dbo.[TargetAddress]
(
 AddressID int IDENTITY(1,1) NOT NULL,
 AddressLine1 nvarchar(60) NOT NULL,
 AddressLine2 nvarchar(60) NULL,
 City nvarchar(30) NOT NULL,
 StateProvinceID int NOT NULL,
 PostalCode nvarchar(15) NOT NULL,
 CONSTRAINT [PK_TargetAddress_AddressID] PRIMARY KEY CLUSTERED 
( [AddressID] ASC ) 
) ON [PRIMARY]
IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[LookupStateProvince]') AND type in (N'U'))
DROP TABLE [dbo].[LookupStateProvince]
GO
CREATE TABLE dbo.[LookupStateProvince]
(
StateProvinceID int IDENTITY(1,1) NOT NULL,
StateProvinceCode nchar(3) NOT NULL,
 CONSTRAINT [PK_LookupStateProvince_StateProvinceID] PRIMARY KEY CLUSTERED 
( [StateProvinceID] ASC )
) ON [PRIMARY]
ALTER TABLE dbo.[TargetAddress] WITH CHECK ADD CONSTRAINT [FK_TargetAddress_LookupStateProvince_StateProvinceID] FOREIGN KEY([StateProvinceID])
REFERENCES dbo.[LookupStateProvince] ([StateProvinceID])
GO
 
INSERT INTO dbo.LookupStateProvince
SELECT 'NY'
UNION SELECT 'NJ'
UNION SELECT 'NV'
UNION SELECT 'NB'

After inserting this data, any record inserted into the TargetAddress table must have a value of 1,2,3, or 4 in the StateProvinceID column because of the foreign key reference

Next we create a source table that represents the data that we need to load. In actual practice, this might be a temporary table, a permanent staging table, an OPENROWSET query to another data source, or any other table-valued data source.

We can assume that the source might contain data that is not loadable because it is missing required values or contains data that does not conform to referential integrity.

IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[SourceAddress]') AND type in (N'U'))
DROP TABLE [dbo].[SourceAddress]
GO
CREATE TABLE dbo.[SourceAddress]
(SourceRowIdentifier int IDENTITY(1579,1) NOT NULL,
 SourceAddressLine1 nvarchar(60) NULL,
 SourceAddressLine2 nvarchar(60) NULL,
 SourceCity nvarchar(30) NULL,
 SourceStateProvinceID int NULL,
 SourcePostalCode nvarchar(15) NULL,
 LoadTime datetime NULL,
 BatchID int NULL
)
/**THE DATA RECORDS BELOW HAVE DATA THAT WILL VIOLATE TargetAddress CONSTRAINTS, SO WILL REVERT TO ROW-BY-ROW PROCESSING */INSERT INTO dbo.[SourceAddress]
( SourceAddressLine1, SourceAddressLine2,
 SourceCity, SourceStateProvinceID,
 SourcePostalCode)
SELECT 'PO BOX 1','RTE 1', 'MILLVILLE', 4, '11011'
UNION SELECT 'PO BOX 2','RTE 2', 'PLEASANTVILLE',NULL,'10101'
UNION SELECT NULL,'RTE 3', 'HUDSON',2,'10202'
UNION SELECT 'PO BOX 4', 'RTE 4',NULL,3,'87654-1011'
UNION SELECT 'PO BOX 5',NULL,'LAS VEGAS',9,'90056'
/**THE FOLLOWING DATA RECORDS WILL NOT GENERATE ERRORS, SO WILL BE PROCESSED COMPLETELY BY SET-BASED STATEMENTS
You can test with these records and see the difference
TRUNCATE TABLE dbo.[SourceAddress]
TRUNCATE TABLE dbo.[ETLErrorLog]
INSERT INTO dbo.[SourceAddress]
( SourceAddressLine1, SourceAddressLine2,
 SourceCity, SourceStateProvinceID,
 SourcePostalCode)
SELECT 'PO BOX 1','RTE 1', 'MILLVILLE', 4, '11011'
UNION SELECT 'PO BOX 2','RTE 2', 'PLEASANTVILLE',3,'10101'
UNION SELECT 'PO BOX 3','RTE 3', 'HUDSON',2,'10202'
UNION SELECT 'PO BOX 4', 'RTE 4','STANTON',3,'87654-1011'
UNION SELECT 'PO BOX 5',NULL,'LAS VEGAS',1,'90056'
**/--Create an errors table to hold ETL errors
IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[ETLErrorLog]') AND type in (N'U'))
DROP TABLE [dbo].[ETLErrorLog]
GO
CREATE TABLE dbo.[ETLErrorLog](
 [ErrorLogID] [int] IDENTITY(1,1) NOT NULL,
 [ErrorCode] [int] NULL,
 [ErrorText] [nvarchar](4000) NULL,
 [ErrorProcedure] [nvarchar](126) NULL,
 [ErrorSourceID] int NULL,
 [ErrorSourceData] xml NULL,
 [ErrorTime] [datetime] NOT NULL CONSTRAINT [DF_ETLErrorLog_ErrorTime] DEFAULT (getdate()),
 CONSTRAINT [PK_ETLErrorLog] PRIMARY KEY CLUSTERED 
( [ErrorLogID] ASC )
) ON [PRIMARY]

The next code snippet contains the conditional set-based processing code. Normally this code would be part of a stored procedure, but to simplify the code example, I have included this sample you can cut and paste into a query window and run. The end of the code example selects from the errors table so you can see the results of processing the batch.

/****BEGIN ETL CODE EXAMPLE*********/--The following code would be compiled within a stored procedure
--Create an ID to update the source data (if required)
DECLARE @BatchID int
SELECT @BatchID = 100
DECLARE @CurrentTime datetime
DECLARE @TotalInserts int
DECLARE @Counter int
--Variables needed if conditional set-based processing reverts to row-by-row processing
DECLARE @RowID int,
 @AddressLine1 nvarchar(60),
 @AddressLine2 nvarchar(60),
 @City nvarchar(30),
 @StateProvinceId int,
 @PostalCode nvarchar(15)
DECLARE @XMLDoc xml
--Declare temporary table to hold source data
DECLARE @TempSource TABLE
(RowID int PRIMARY KEY,
 AddressLine1 nvarchar(60),
 AddressLine2 nvarchar(60),
 City nvarchar(30),
 StateProvinceID int,
 PostalCode nvarchar(15)
)
INSERT INTO @TempSource
(RowID,AddressLine1,AddressLine2,
 City, StateProvinceID, PostalCode)
SELECT 
SourceRowIdentifier,
SourceAddressLine1, SourceAddressLine2,
 SourceCity, SourceStateProvinceID,
 SourcePostalCode
FROM dbo.SourceAddress
WHERE BatchID IS NULL
--BEGIN CODE SECTION FOR SET-BASED PROCESSING 
SET @TotalInserts = (SELECT COUNT(*) FROM @TempSource)
IF @TotalInserts > 0
BEGIN
 
BEGIN TRY
BEGIN TRAN
INSERT INTO dbo.TargetAddress
 (AddressLine1,
 AddressLine2,
 City, 
 StateProvinceID, 
 PostalCode)
SELECT 
 AddressLine1,
 AddressLine2,
 City, 
 StateProvinceID, 
 PostalCode
FROM @TempSource
SET @CurrentTime = getdate()
UPDATE dbo.SourceAddress
 SET LoadTime = @CurrentTime,
 BatchID = @BatchID
 FROM dbo.SourceAddress S JOIN @TempSource T 
 ON T.RowID = S.SourceRowIdentifier
IF @@TRANCOUNT > 0 
 COMMIT TRAN
--Don't delete from temp table until transaction is committed
 DELETE @TempSource
PRINT 'Source Records processed in set-based statements'
END TRY
BEGIN CATCH --Outer catch block
--Rollback all data. This does not delete or remove the table variable and its data.
 IF @@TRANCOUNT > 0
 ROLLBACK
--Record a single error in the errors table. 
 --We may not know which record caused the batch to fail, but we record the failure.
INSERT INTO dbo.ETLErrorLog
 ( ErrorCode,
 ErrorText,
 ErrorProcedure,
 ErrorSourceID
 ) 
 SELECT ERROR_NUMBER(),
 ERROR_MESSAGE(),
 ERROR_PROCEDURE(),
 @BatchID
 
--We still have our staged records in the @TempSource table
 SELECT TOP 1
 @RowID = RowID,
 @AddressLine1 = AddressLine1,
 @AddressLine2 = AddressLine2,
 @City = City,
 @StateProvinceId = StateProvinceID,
 @PostalCode = PostalCode
 FROM @TempSource
SET @Counter = 1
 -- LOOP through each record in the batch. We previously got the total count.
 WHILE @Counter <= @TotalInserts
 BEGIN
BEGIN TRY
 PRINT 'Processing record '+ CAST(@RowID AS NVARCHAR(4)) +' in one-at-a-time loop...'
 BEGIN TRAN
 
 INSERT INTO dbo.TargetAddress
 (AddressLine1,
 AddressLine2,
 City, 
 StateProvinceID, 
 PostalCode)
 SELECT 
 @AddressLine1,
 @AddressLine2,
 @City, 
 @StateProvinceID, 
 @PostalCode
UPDATE dbo.SourceAddress
 SET LoadTime = @CurrentTime,
 BatchID = @BatchID
 WHERE SourceRowIdentifier = @RowID
--Commit each record one-at-a-time, updating the Source table that it has been processed, catching the errors
 IF @@TRANCOUNT > 0
 COMMIT TRAN
END TRY
 BEGIN CATCH
--Roll back only the processing of the one record
 IF @@TRANCOUNT > 0
 ROLLBACK
SELECT @XMLDoc =
 (SELECT RowID,
 AddressLine1,
 AddressLine2,
 City, 
 StateProvinceID, 
 PostalCode
 FROM @TempSource
 WHERE RowID = @RowID
 FOR XML PATH (''))
INSERT INTO dbo.ETLErrorLog
 ( ErrorCode,
 ErrorText,
 ErrorProcedure,
 ErrorSourceID,
 ErrorSourceData
 ) 
 SELECT ERROR_NUMBER(),
 ERROR_MESSAGE(),
 ERROR_PROCEDURE(),
 @RowID,
 @XMLDoc
 
--You may wish to add code here to limit the number of times a failed record attempts to load in subsequent batches
 --Failed records can be moved to another table, flagged, or deleted, depending on business rules
END CATCH --Nested catch block
DELETE @TempSource
 WHERE @RowID = RowID
 
 --Continue getting data for WHILE loop
 SELECT @Counter = @Counter+1 
 SELECT @RowID = NULL, @AddressLine1 = NULL, @AddressLine2 = NULL, @City = NULL, @StateProvinceID = NULL, @PostalCode = NULL
SELECT TOP 1
 @RowID = RowID,
 @AddressLine1 = AddressLine1,
 @AddressLine2 = AddressLine2,
 @City = City,
 @StateProvinceId = StateProvinceID,
 @PostalCode = PostalCode
 FROM @TempSource
 
END --While Loop for Row-by-row processing
END CATCH --Initial catch block
END --If Total_Inserts > 0
SELECT * FROM dbo.ETLErrorLog
 

To fine-tune this method, I would add one other piece to the solution if the amount of data to be loaded per set is greater than 20,000 to 50,000 records. A throttle mechanism for batching data set loads in manageable chunks might need to be added, depending on the server memory, processing power and load on the server. If an average of 100,000 records was available to be loaded at one time, it might be suitable to process 25,000 at a time.

In this way, if the conditional set-based processing failed, you would revert to row-by-row processing of only those 25,000 records, not the entire 100,000. Also, based on the typical error rate of the data load, you could set the batch sizes up or down: if errors were more frequent, you could set batch sizes down to 1,000 records at a time and still gain some processing speed even if only 20% made it through without errors.

Resources

Rate

4.42 (38)

You rated this post out of 5. Change rating

Share

Share

Rate

4.42 (38)

You rated this post out of 5. Change rating