SQLServerCentral Article

Importing Files in Parallel with SSIS

,

Introduction

With “big data” becoming more prevalent in everyday scenarios, we need ways of being able to import data faster and more efficiently. When faced with having to import thousands if not millions of files, there are a few cloud-based solutions which make this fairly simple. However, due to the policies within many organisations, cloud solutions may not yet be feasible options. Nevertheless many companies do have existing SSIS implementations and they can leverage this.

Problem

The main problem is that if you are importing thousands of files with the exact same file structure, your package still needs to process each file sequentially. Even if each file only takes a second to import, it’ll take a few thousand seconds to import all of the files.

Solution

A way to improve the speed of the import, would be to run multiple instances of your package in parallel whereby each instance processes a portion of the total number of files.

Step 1

Create a SSIS solution and add some project-level parameters:

  • SourceDirectory – is the path to all the source files
  • PackageName – is the full path of the package that will import the files
  • NoOfPackageInstances – is the number of parallel instances of the package to run
  • FileMask – is the filename filter in case you only want to process certain files within the folder

Figure 1

Step 2

Create a new package with a “Script Task” component:

Figure 2

Step 3

Open the Script Task, add the project-level parameters, and then click on “Edit Script”:

Figure 3

Step 4 (see the full code at the end of the article)

Declare variables and assign them the values from the project parameters. Declare variables to keep track of the filenames to be imported as well as which group they’ll be split into:

Figure 4

Loop through each file and assign it to a particular group based on the number of package instances:

Figure 5

Loop through each file group and execute an instance of the package:

Figure 6

The ExecutePackage method:

Figure 7

Step 5

Create a new package and add some variables:

  • FileList – is the list of files to process. The name must be the same as in the code in step 4. It must be writable
  • FilePath – is the name of the individual file being processed

Figure 8

Step 6

Create a Foreach Loop Container in the new package that contains a Data Flow Task inside:

Figure 9

Step 7

In the Foreach Loop Container, select “Foreach From Variable Enumerator”, and select the “User::FileList” variable:

Figure 10

Then select “User::FilePath” under “Variable Mappings”:

Figure 11

Step 8

Update the Data Flow so that it has a source, destination and any required transformations:

Figure 12

Step 9

In the source connection manager, set the expression for the connection string property to “User::FilePath”:

Figure 13

Step 10

In the destination connection manager make sure that "Table lock" is not selected, otherwise each package instance won't be able to write to the table concurrently:

Figure 14

Full Script Task Code

Please excuse my code if it isn't the neatest or most efficient way of doing things

#region Namespaces
using System;
using System.IO;
using System.Linq;
using System.Threading;
using System.Collections;
using System.Collections.Generic;
using Microsoft.SqlServer.Dts.Runtime;
#endregion

namespace ST_e1209e3a____
{
    /// <summary>
    /// ScriptMain is the entry point class of the script.  Do not change the name, attributes,
    /// or parent of this class.
    /// </summary>
[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{
public void Main()
{
            try
            {
                string sourceDirectory = Dts.Variables["$Project::SourceDirectory"].Value.ToString();
                string fileMask = Dts.Variables["$Project::FileMask"].Value.ToString();
                Int16 noOfPackageInstances = Convert.ToInt16(Dts.Variables["$Project::NoOfPackageInstances"].Value);
                string packageName = Dts.Variables["$Project::PackageName"].Value.ToString();
                Int32 fileCount = 0;
                Int32 fileGroup;
                List<string>[] allFiles = new List<string>[noOfPackageInstances];
                foreach (string file in Directory.GetFiles(sourceDirectory, fileMask))
                {
                    fileCount += 1;
                    fileGroup = fileCount % noOfPackageInstances;
                    List<string> tempList = new List<string>();
                    if (allFiles[fileGroup] != null)
                    {
                        tempList = allFiles[fileGroup];
                    }
                    tempList.Add(file);
                    allFiles[fileGroup] = tempList;
                }
                var Threads = new List<Thread>();
                for (Int16 i = 0; i < noOfPackageInstances; i++)
                {
                    Int16 j = i;  // due to closure and scoping of the variable in the loop
                    Thread t = new Thread(() => ExecutePackage(allFiles[j], packageName))
                    {
                        IsBackground = false
                    };
                    t.Start();
                    Threads.Add(t);
                }
                while (Threads.Any(t => t.ThreadState == ThreadState.Running))
                {
                    Thread.Sleep(500);
                };
                Dts.TaskResult = (int)ScriptResults.Success;
            }
            catch(Exception ex)
            {
                Dts.Log(ex.Message, 0, null);
                Dts.TaskResult = (int)ScriptResults.Failure;
            }
}

        public void ExecutePackage(object fileList, string dtsxPackage)
        {
            Application app = new Application();
            Package pkg = app.LoadPackage(dtsxPackage, null);
            Variables vars = pkg.Variables;
            vars["User::FileList"].Value = fileList;
            pkg.Execute(null, vars, null, null, null);
        }

        #region ScriptResults declaration
        /// <summary>
        /// This enum provides a convenient shorthand within the scope of this class for setting the
        /// result of the script.
        /// 
        /// This code was generated automatically.
        /// </summary>
        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };
        #endregion

}
}

Rate

5 (12)

You rated this post out of 5. Change rating

Share

Share

Rate

5 (12)

You rated this post out of 5. Change rating