Labels

Monday, August 3, 2015

Programatically invoking multiple instances of SSIS package to execute them in parallel

using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.IO;
using System.Data.OleDb;
using System.Threading;
using System.Collections.Generic;

 public void Main()
        {
           
            var totalInstances = Convert.ToInt32(Dts.Variables["User::ParallelExecutionCount"].Value);
            var RowIteratorPkg = Dts.Variables["User::RowIteratorFilePath"].Value.ToString();

            try
            {
                if (File.Exists(RowIteratorPkg))
                {
                    var taskList = new System.Threading.Tasks.Task[totalInstances];

                    for (var i = 0; i < totalInstances; i++)
                    {
                        int runningInstance = i;
                        taskList[i] = System.Threading.Tasks.Task.Factory.StartNew(() => TriggerNewPackageInstance(RowIteratorPkg, runningInstance, totalInstances));
                    }

                    System.Threading.Tasks.Task.WaitAll(taskList);
                Dts.TaskResult = (int)ScriptResults.Success;
            }
                else
                {
                    logException(999, "RowIterator package does not exist in the configured location: " + RowIteratorPkg);
                    Dts.TaskResult = (int)ScriptResults.Failure;
                }
            }
            catch (Exception ex)
            {
                logException(999, "Exception in executing the RowIterator package: " + ex.Message);
                Dts.TaskResult = (int)ScriptResults.Failure;
            }
        }

        private void TriggerNewPackageInstance(string RowIteratorPkg, Int32 runningInstance, Int32 totalInstances)
        {
            try
            {
            var App = new Application();
            var pkg = new Package();
            pkg = App.LoadPackage(RowIteratorPkg, null);
                pkg.Variables["PackageInstanceNumber"].Value = (Int32)runningInstance;
                pkg.Variables["ParallelInstances"].Value = (Int32)totalInstances;
                pkg.Variables["DBConString"].Value = DbConnString;
            pkg.Execute();
        }
            catch (Exception ex)
            {
                logException(999, "Exception in executing the package: " + ex.Message + " Running Instance: " + runningInstance.ToString() + "totalInstance : " + totalInstances.ToString());
            }
        }