SQLServerCentral Article

Concurrent processing in SSIS – Part 2

,

In the first part we covered some simple and more complex methods to get SSIS to run more than one process at a time. Before we get into the different methods, please execute the “TempTableBuildPopulate.sql” script from the sample project is you have not done so. This creates the control structures and sample data for all of the examples. The sample data includes up to 55 random id values. You will need to place the sample project in C:\Temp, this is due to hard coded path references for child packages in later examples. As a design point, all of the logic is contained in the packages in these examples. In a production build I would tend to suggest moving the larger blocks of logic into sprocs and calling them from the database. This was not done in the examples for clarity.

The next two examples look a lot like previous examples, however they function very differently. We don’t use For Each Loops in these examples. Instead they are replaced by For Loops. Just like in the other examples, “Generate Data” cleans up the control structures, but in this case we don’t need to have natural or assigned allocations of the work. The process doesn’t care, we are only limiting the number of running processes and that number sets how many For Loops are used. This example is called “ParentSingleChildMethod.dtsx”:

Large Package

This method is heavily dependent on the control structures mentioned previously. For our example the ClientId contains some key that we are going to perform “work” on, and the other two fields, LockStartDt and ProcessDt, are the important elements here. A non-null ProcessDt indicates that the record is processed and therefore is complete. It is the last step in the child package.

The LockStartDt field is where a large part of the magic happens, this field helps us to “lock” a ClientId and recover from the child package dying and leaving a record unprocessed. When we cover the child package we will cover the logic at length. The control structure for this example and the next is:

CREATE TABLE [dbo].[tempWorkLoadParentSingleChildMethod](
[ClientId] [int] NOT NULL,
[LockStartDt] [datetime] NULL,
[ProcessDt] [datetime] NULL,
CONSTRAINT [PK_tempWorkLoadParentSingleChildMethod]
PRIMARY KEY CLUSTERED ( [ClientId] ASC ) )

The variables setup for this example are pretty straightforward, we have six defined at the package level, they are

  • User::WorkCount
  • User::FL_1_RecCount
  • User::FL_2_RecCount
  • User::FL_3_RecCount
  • User::FL_4_RecCount
  • User::FL_5_RecCount”

The variable “User::WorkCount” contains the total number of records that we will be processing. Because we are using For Loops, the criteria used helps keep them running until there is no more work to do. The other variables hold how many unprocessed records there are at the end of each package execution.

Package Variables

The Execute SQL task “Get Work Count” sets up for the For Loops, each of them keeps a local count of the number of unprocessed records. This task does nothing more than gets a count of the total number of records in the control structure and stuffs it into the “User::WorkCount” variable.
Get Work Task

The next set of components looks a lot like what we have seen before, but there are big differences. We are using a For Loop instead of a For Each Loop and the effect is that each container cannot know at design-time what records it will be responsible for running at run-time. For our example we have an Execute Package task and an Execute SQL task. The execute package task is exactly the same across all of the individual For Loops. The get Execute SQL task is the same except for the variable it uses. It is important to notice that the predecessor between the “Execute Package Task” and “Get Current Work Count” is “On Complete” because we want it to run regardless of the outcome of the previous step.

ETL Process

The For loop doesn’t require much setup. The only property we are using is the “EvalExpression”, and as long as this property is true this loop will continue to run. The condition is very simple; the @WorkCount is the total number of records in the control structure, the @FL_1_RecCount is recalculated following each run of the Execute Package task by the “Get Current Work Count” step. As long as there is work to do it will loop.

For Loop properties

Like the For Each Loop we set the MaximumErrorCount to 5000.

For loop Error Count

The execute package task that is in each For Loop is the same for each. It’s setup is very straightforward. We are telling it that the package is stored in the “File system” and to use the Connection “SingleChild_Package”. We have also set ExecuteOutOfProcess to “True”. The “SingleChild_Package” connection does little more than say the package is located at: “C:\TEMP\Multi_Run_Sample\SingleChild.dtsx”.

Package Properties

The Execute SQL Task is called whether the Execute Package task is successful or not, and it retrieves the current number of unprocessed records. We set the result set to “Single row”.

Execute SQL Task Editor

The SQLStatement is very simple:

SELECT WorkCount=COUNT(*)
FROM [tempdb].[dbo].[tempWorkLoadParentSingleChildMethod]
WHERE ProcessDt IS NOT NULL
GO

We map the result set into the variable appropriate to the For Loop, in this case “User::FL_1_RecCount”.
SQL Task Result Set

What we have looked at so far is only the process to get the child package running, what we are going to show now is the child package, called “SingleChild.dtsx”:

Package Designer

This package gets a ClientId in the “Get Client Id For This Run” task.
Get ClientID Task

The code to get a ClientId is fairly complex, and the code is inline commented:

DECLARE @ClientId         int,
         @NoDataAvailFlag   tinyint,
         @LockExpireMinutes int
DECLARE @LockWk TABLE ( ProcessDt datetime, LockStartDt datetime )
SET@LockExpireMinutes = 10
SET@NoDataAvailFlag = 1 -- We assume we don't have data available.
 GetNewClientId:
 -- Get the minimum client id that has not been processed and has not timed out.
 -- Timeout is 10 minutes, the process MUST update the lock at least every 10m.
 SELECT @ClientId = MIN( ClientId )
 FROM   dbo.tempWorkLoadParentSingleChildMethod
 WHERE  ProcessDt IS NULL
 AND    (LockStartDt IS NULL
       OR DATEADD( MI, @LockExpireMinutes, LockStartDt )< CURRENT_TIMESTAMP )
 -- Check for potential out of work condition
 IF ( @ClientId IS NULL )
 BEGIN -- We didn't get a client id, there are either none unprocessed or none
       -- with expired locks.  We need to find out which..
      IF EXISTS (SELECT 'X'
                  FROM   dbo.tempWorkLoadParentSingleChildMethod
                  WHERE  ProcessDt IS NULL )
      BEGIN -- We have work to do but none with expired locks.
           WAITFORDELAY '00:00:30' -- Hold for 30 seconds before trying again
           GOTO GetNewClientId -- Try again
      END
      -- If we got here there was no work to do because there were no unprocessed
      -- client ids.  We know this because to get into this block the ClientId had
      -- to be NULL and we already checked for the existence of unprocessed clients
      -- regardless of lock status.  We are done with this run.
      SELECT @ClientId = NULL
 END
 ELSE
 BEGIN -- We got a client id, no lets try and lock it.
      BEGIN TRAN
      -- Set the ProcessDt LockStartDt to the value in the table already, this
      -- should lock the row to THIS process and prevent edits elsewhere.
      UPDATE dbo.tempWorkLoadParentSingleChildMethod
      SET    ProcessDt   = ProcessDt,
             LockStartDt = LockStartDt
      OUTPUT INSERTED.ProcessDt, INSERTED.LockStartDt INTO @LockWk
      WHERE  ClientId = @ClientId
      -- Lets look at what we got from the update
      IF EXISTS (SELECT 'X' FROM @LockWk WHERE LockStartDt IS NULL)
      BEGIN -- Record is not locked by any process, it is ours then.
           UPDATE dbo.tempWorkLoadParentSingleChildMethod
           SET    LockStartDt = CURRENT_TIMESTAMP
           WHERE  ClientId = @ClientId
           SET @NoDataAvailFlag = 0
           COMMIT-- Commit the transaction which clears the lock
      END
      ELSE
      BEGIN -- The record was previously locked, we need to find out if it is an
            -- old lock that was expired
           IF EXISTS ( SELECT 'X'
                       FROM   @LockWk
                       WHERE DATEADD( MI, @LockExpireMinutes, LockStartDt )
                                             < CURRENT_TIMESTAMP )
           BEGIN-- Expired lock, it is ours then
                UPDATE dbo.tempWorkLoadParentSingleChildMethod
                SET    LockStartDt = CURRENT_TIMESTAMP
                WHERE  ClientId = @ClientId
                SET @NoDataAvailFlag = 0
                COMMIT -- Commit the transaction which clears the lock
           END
           ELSE
           BEGIN-- Non-expired lock, to get here the lock would have to be
                 -- fairly new, since the initial query to get ClientId only
                 -- looks for unprocessed records with no lock or expired locks.
                COMMIT -- Commit the transaction which clears the lock
                GOTO GetNewClientId -- Try again
           END
      END         
 END
 SELECT ClientId       = @ClientId,
        NoDataAvailFlag = @NoDataAvailFlag
 GO

The point at the end where we are indicating completion is very simple.

Set completion date task

The query to show completion:

UPDATE [tempdb].[dbo].[tempWorkLoadParentSingleChildMethod]
SET ProcessDt = CURRENT_TIMESTAMP
WHERE ClientId = ?

In the child package there are three other Execute SQL tasks. The two titled “Sample Work” are just placeholders for work. The one titled “Update Lock of Client Id” would be used if you had a long running child process that could conceivably exceed the “lock” time of the ClientId. It simply updates the LockStartDt in the control structures, thereby giving this particular run of the child package another 10 minutes.

You should also notice the little expression symbol immediately below the “Get Client Id For This Run” task. This is because the precedence operator for running “Sample Work 1” is that the “Get Client Id For This Run” is successful AND that the NoDataAvailFlag variable is equal 0. In a case where a ClientId is not available the NoDataAvailFlag will be set to 1. This allows the package to exit successfully if there is nothing to do. The precedence configuration is shown here:

Precedence Constraint Editor

Back in the “ParentSingleChildMethod.dtsx” package:

Full Package

Lets look at a new process in this example; it is the “Execution Time Limiter” in the upper right-hand corner. Its purpose is to prevent the process from running away from us. A condition of using the For Loops and watching for the record counts matching the working count is that if you have one or more processes die and cannot be completed the process will continue to retry them and fail each and every time. This process looks at the LockStartDt and ProcessDt fields and prevents the package from running longer than ten minutes after the highest LockStartDt or twenty minutes after the highest ProcessDt.

The reasoning is tied to the VERY disconnected design of this process, the children only know they need to look at the control structure to do work. The parent largely only knows that there is still work to do, because we want the processes to inherently handle failures of any single process we allow them to fail and keep going. As you recall from the “SingleChild.dtsx” package it gets an id to work with, uses it for a bit, updates the lock to buy us more time to work, uses the id for a bit longer, and then completes the process.

But what happens if the process dies EVERY TIME? The lock will expire and it will take it again and again and never complete successfully. The LockStartDt field handles the “died in process but could complete if we tried again” case. The ProcessDt handles the case where it will never stop processing. This is also the only component that is configured to fail the package on failure, the purpose for this is that if it detects one of the runaway conditions it completes with failure and forces the rest of the package to stop, when the processes complete successfully it completes successfully shortly after them.

There are a number of options including the use of event handlers to indicate package failure. And there are other ways to handle retry conditions, such as adding a remaining retry count to the control structure and decrementing it when that record is assigned to a process. The main issue I have with this method is that it does not take into account packages that fail right at the beginning before any processing has taken place. To trigger this condition you only need to have the child package fail at startup and the parent package would never end. While this sort of retry method could be added and would be a nice addition, it is not sufficient to terminate package execution in the event of a problem.

Execution Time Limiter Task

Task properties

The final method that will be shown is by far the most complex and ”compact”. However it is by far the most flexible. It is not recommended for those who don’t fully understand the code and the process. It is called “ParentSingleChildScriptMethod.dtsx” and on the surface looks really simple:

New Package

The “Generate Data” task works like all of the other examples and cleans up the control structure. All of the real work occurs in the Script Task.

We have only one package variable in this example, “MaxWorkerThreads”, which specifies how many tasks will ever be running at one time:
Package Variables

The script task uses this variable and is defined as a ReadOnlyVariable:

Script Task Editor

The script for this example is fairly large so we will not cover every aspect but will highlight sections. To facilitate threaded execution a class called Instance was added to the script, it handles most of the heavy lifting:
Script Task Code

The instance class includes handles the logic that builds and executes all threads. The instance class uses a class called Thread that is a representation of each individual executable.
thread Class

These classes are used to make building and executing threads of execution easier. All of the work is controlled from ScriptMain.Main:

' Microsoft SQL Server Integration Services Script Task
' Write scripts using Microsoft Visual Basic
' The ScriptMain class is the entry point of the Script Task.
Imports System
Imports System.Data
Imports System.Math
Imports Microsoft.SqlServer.Dts.Runtime
Public Class ScriptMain

Private _sConnString As String
Private _oConn As OleDb.OleDbConnection
Private _oReader As OleDb.OleDbDataReader
' The execution engine calls this method when the task executes.
' To access the object model, use the Dts object. Connections, variables, events,
' and logging features are available as static members of the Dts class.
' Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
'
' To open Code and Text Editor Help, press F1.
' To open Object Browser, press Ctrl+Alt+J.
Public Sub Main()
Dts.Events.FireInformation(0, "ScriptMain", "Main Starting", String.Empty, 0, False)
Dim _oInstance As Instance
Dim _iRC As Integer
Dim _dDt As Date
Dim _iMaxWaitTimeInMinutes As Int32 = 10
Try
Dts.Events.FireInformation(0, "ScriptMain", "Int Try", String.Empty, 0, False)
_sConnString = Dts.Connections.Item("localhost.tempdb").ConnectionString
_oConn = New OleDb.OleDbConnection(_sConnString)
_oConn.Open()
Dts.Events.FireInformation(0, "ScriptMain", "Conn Open", String.Empty, 0, False)
Dim _oCntCMD As New OleDb.OleDbCommand("SELECT WorkCount=COUNT(*) FROM [dbo].[tempWorkLoadParentSingleChildMethod]", _oConn)
_oReader = _oCntCMD.ExecuteReader
_oReader.Read() 'Get the row, we know there is only one.
_iRC = _oReader.GetInt32(0)
_oReader.Close() 'Close the reader, we'll reuse the connection later but not right yet.
Dts.Events.FireInformation(0, "ScriptMain", "RC: " & _iRC.ToString, String.Empty, 0, False)
If _iRC > 0 Then
'We have some work to do so we'll do it.
_oInstance = New Instance(CInt(Dts.Variables("MaxWorkerThreads").Value), _
"C:\Program Files\Microsoft SQL Server\90\DTS\Binn\", _
"C:\Temp\Multi_Run_Sample\SingleChild.dtsx")
'Once we instantiate the threads we need to sit here and watch to make sure they run.
_oInstance.InstantiateThreads()
Dts.Events.FireInformation(0, "ScriptMain", "Starting 2 minute wait", String.Empty, 0, False)
'Wait for two minutes before we start looking to see if the processes are running.
Threading.Thread.Sleep(120000)
Dts.Events.FireInformation(0, "ScriptMain", "2 minute wait completed", String.Empty, 0, False)
While 1 = 1
Dts.Events.FireInformation(0, "ScriptMain", "Thread Watch Top Of Cycle", String.Empty, 0, False)
'Get processing metrics
_oCntCMD = New OleDb.OleDbCommand("SELECT MaxLockStartDt = ( SELECT MAX( LockStartDt ) FROM dbo.tempWorkLoadParentSingleChildMethod (NOLOCK) ), UnprocessCnt = ( SELECT COUNT(*) FROM dbo.tempWorkLoadParentSingleChildMethod (NOLOCK) WHERE ProcessDt IS NULL )", _oConn)
_oReader = _oCntCMD.ExecuteReader
_oReader.Read() 'Get the row, we know there is only one.
If _oReader.IsDBNull(0) Then
_dDt = Nothing
Else
_dDt = _oReader.GetDateTime(0)
End If
_iRC = _oReader.GetInt32(1)
_oReader.Close() 'Close the reader
'Check for processed record count
If _iRC = 0 Then
Exit While'All records are processed..
End If
'Handle case where there are no locked records, after startup this shouldn't happen
If _dDt = Nothing Then
'After waiting the initial start time this should not happen unless there is a problem
Throw New Exception("No processes have started. Aborting run.")
End If
'Check to see if we have exceeded the maximum wait time.
If _dDt.AddMinutes(_iMaxWaitTimeInMinutes) <= Now() Then
Throw New Exception("Maximum wait time has been exceeded. Aborting run.")
End If
'Wait for one minute before retrying.
Threading.Thread.Sleep(60000)
End While
EndIf
Dts.TaskResult = Dts.Results.Success
Catch ex As Exception
Dts.Events.FireInformation(0, "ScriptMain", "Exception: " & ex.ToString, String.Empty, 0, False)
Dts.TaskResult = Dts.Results.Failure
Finally
Dts.Events.FireInformation(0, "ScriptMain", "Finally", String.Empty, 0, False)
'We might need to cleanup our instance object and kill threads.
IfNot _oInstance Is Nothing Then
'When we are all done we kill anything left over.
_oInstance.KillThreads()
EndIf
End Try
End Sub
End Class

This code works a lot like the “ParentSingleChildMethod.dtsx” package only is entirely in VB.NET. There are a couple of important elements in this example to point out. The “Execution Time Limiter” is handled within the code and the 10 minute window is determined by this line:
Dim _iMaxWaitTimeInMinutes As Int32 = 10

Another element is the creation of the Instance object. At that point we specify the location of the DTExec.exe file and the location of the package file. In this example we re-use “SingleChild.dtsx”. This is the line we are talking about:

'We have some work to do so we'll do it.
_oInstance = New Instance(CInt(Dts.Variables("MaxWorkerThreads").Value), _
"C:\Program Files\Microsoft SQL Server\90\DTS\Binn\", _
"C:\Temp\Multi_Run_Sample\SingleChild.dtsx")

Also throughout the code calls to “Dts.Events.FireInformation” are made. This kicks out status information as to what the script is doing. Because this process has so little visibility in the GUI this is crucial to know what is happening under the hood.

Conclusion

Keep in mind there are practical limits on how many processes can be running at a time. Those limits are primarily imposed by memory and processing power. For example in most cases on a small machine running SQL and the development tools with 2GB of memory I couldn’t get more than 3 concurrent child packages running at any one time except for the last example where we had complete control over the number of threads running. Also if the processing you are doing is fairly intensive on the server you may want to limit the number of processes because in this case many times more processes make EVERY process take a lot longer, so while you are running five processes at a time they are taking eight times longer to run. That is not what we are trying to accomplish. In most cases empirical testing is the only way to determine what the threshold is for saturation of the machines resources.

Resources

Rate

4.3 (10)

You rated this post out of 5. Change rating

Share

Share

Rate

4.3 (10)

You rated this post out of 5. Change rating