Компоненты сценария SSIS DataFlow Oracle и SQL путаются диспетчеры подключения к серверу - PullRequest
1 голос
/ 16 июня 2020

Итак ... У меня есть пакет SSIS, который создает и запускает пакеты SSIS в памяти. Создание "мета" пакета происходит внутри компонента сценария DataFlow, который настроен как место назначения. Основная c суть состоит в том, чтобы извлечь данные из одного или нескольких Oracle источников и поместить их в SQL серверную промежуточную базу данных (используется для бизнес-аналитики). У меня был большой успех с пакетом, за исключением случаев, когда я пытаюсь получить данные из источника Oracle с помощью DB Link (также известного как SCHEMA. TABLE_NAME@OTHER).

Диспетчер подключений Oracle создан не иначе, как если бы я не использовал ссылку на БД. Оператор Select, который я использую (содержащий синтаксис DB Link), создается задолго до того, как я запускаю пакет SSIS.

Когда я пытаюсь запустить «мета-пакет», содержащий один из этих операторов select, я получаю ошибки, связанные с использованием синтаксиса, который OLE DB SQL Server не распознает. Это означает, что вместо использования Oracle моего диспетчера исходных подключений он использует либо мой SQL диспетчер подключений к серверу, либо забывает, что это диспетчер подключений Oracle.

Опять же, ЕДИНСТВЕННАЯ разница - это синтаксис DB Link в операторе SELECT. Это буквально это.

Я попытался изменить свой код так, чтобы диспетчеры соединений создавались непосредственно перед тем, как они понадобятся. Но это не решает проблемы (просто вводит больше). Я пробовал использовать диспетчеры соединений из базового внешнего пакета. Но и здесь есть свои проблемы. Я даже попытался «повторно инициализировать» диспетчеры соединений прямо перед тем, как они понадобятся, что, похоже, ничего не делает.

Я использую SSIS 2017. Базы данных Oracle различаются по версиям, но все они 11 и выше.

Кто-нибудь когда-нибудь сталкивался с этим, и если да, то как вы это исправляли?

Я включаю часть своего кода ниже; Мне пришлось обрезать все, чтобы поместиться в этот пост. table и определяет, нужно ли просто усечь и загрузить; секция, которая (как только она определена как trun c и загружается) определяет, требуется ли сравнение удалений; и раздел, который определяет, нужно ли обрабатывать обновления / вставки. Я включил секцию Truncate и Load практически целиком, которая демонстрирует logi c практически повсюду.

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using Microsoft.SqlServer.Dts.Pipeline;
using System.Data.SqlClient;
#endregion

[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
    private bool fireAgain = true;
    private bool continueProcessing = true;
    private string processingErrorMessage;
    private string pathName;

    private ConnectionManager srcConMgr;
    private ConnectionManager dstConMgr;

    public override void PreExecute()
    {
        base.PreExecute();
        ComponentMetaData.FireInformation(10, $"Begining Pre Execute of Threaded Path", "Begining Pre Execute of First Threaded Path", "", 0, fireAgain);
    }

    public override void PostExecute()
    {
        base.PostExecute();
        if (continueProcessing)
        {
            ComponentMetaData.FireInformation(10, $"Begining Post Execute of {pathName}", $"Begining Post Execute of {pathName}", "", 0, fireAgain);
        }
        else
        {
            ComponentMetaData.FireInformation(10, $"Begining Post Execute of {pathName}", $"There were Processing Errors along {pathName}:  {processingErrorMessage}.  Table was skipped for processing.", "", 0, fireAgain);
        }
    }

    string BlobColumnToString(BlobColumn blobColumn)
    {
        if (blobColumn.IsNull)
            return string.Empty;

        var blobLength = Convert.ToInt32(blobColumn.Length);
        var blobData = blobColumn.GetBlobData(0, blobLength);
        var stringData = System.Text.Encoding.Unicode.GetString(blobData);

        return stringData;
    }

    ConnectionManager CreateConnectionManager(Microsoft.SqlServer.Dts.Runtime.Package package, string conString, string conName, string conDescription)
    {
        ConnectionManager bldConnectionManager = package.Connections.Add("OLEDB");
        bldConnectionManager.ConnectionString = conString;
        bldConnectionManager.Name = conName;
        bldConnectionManager.Description = conDescription;

        bldConnectionManager.AcquireConnection(null);

        return bldConnectionManager;
    }

    static void ReInitializeConnectionManager(ConnectionManager conMgr, string conString)
    {
        conMgr.ConnectionString = conString;

        conMgr.AcquireConnection(null);
    }

    Executable CreateExecutable(Microsoft.SqlServer.Dts.Runtime.Package package, string exType)
    {
        Executable e = package.Executables.Add(exType);

        return e;
    }

    MainPipe CreateDataFlowTask(Executable executable, string dfName)
    {
        Microsoft.SqlServer.Dts.Runtime.TaskHost thMainPipe = executable as Microsoft.SqlServer.Dts.Runtime.TaskHost;
        thMainPipe.Name = dfName;
        MainPipe dataFlowTask = thMainPipe.InnerObject as MainPipe;
        (dataFlowTask as IDTSPipeline130).AutoAdjustBufferSize = true;

        return dataFlowTask;
    }

    IDTSComponentMetaData100 CreateOLEDBComponent(Microsoft.SqlServer.Dts.Runtime.Application app, MainPipe dataFlowTask, string componentName, bool createSource)
    {
        //Create the DataFlow Task
        IDTSComponentMetaData100 oleComponent = dataFlowTask.ComponentMetaDataCollection.New();
        oleComponent.Name = componentName;

        if (createSource == true)
        {
            oleComponent.ComponentClassID = app.PipelineComponentInfos["OLE DB Source"].CreationName;
        }
        else
        {
            oleComponent.ComponentClassID = app.PipelineComponentInfos["OLE DB Destination"].CreationName;
        }

        return oleComponent;
    }

    CManagedComponentWrapper CreateOLEDBSourceDesignTimeInstance(Microsoft.SqlServer.Dts.Runtime.Package package, IDTSComponentMetaData100 source, ConnectionManager sourceConnectionManager, string sourceSQL)
    {
        //Get the design-time instance of the component.
        CManagedComponentWrapper srcDesignTime = source.Instantiate();

        //Initialize the component
        srcDesignTime.ProvideComponentProperties();

        //Map the component to a connection manager
        source.RuntimeConnectionCollection[0].ConnectionManagerID = sourceConnectionManager.ID;
        source.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(package.Connections[0]);

        //Set the OLE DB Source properties
        srcDesignTime.SetComponentProperty("AccessMode", 2);
        srcDesignTime.SetComponentProperty("SqlCommand", sourceSQL);

        // Reinitialize the metadata
        srcDesignTime.AcquireConnections(null);
        srcDesignTime.ReinitializeMetaData();
        srcDesignTime.ReleaseConnections();

        return srcDesignTime;
    }

    CManagedComponentWrapper CreateOLEDBDestinationDesignTimeInstance(Microsoft.SqlServer.Dts.Runtime.Package package, IDTSComponentMetaData100 destination, ConnectionManager destinationConnectionManager, string stagingAlias)
    {
        CManagedComponentWrapper destDesignTime = destination.Instantiate();
        destDesignTime.ProvideComponentProperties();

        destination.RuntimeConnectionCollection[0].ConnectionManagerID = destinationConnectionManager.ID;
        destination.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(package.Connections[1]);

        destDesignTime.SetComponentProperty("AccessMode", 3);
        destDesignTime.SetComponentProperty("FastLoadOptions", "TABLOCK");
        destDesignTime.SetComponentProperty("OpenRowset", stagingAlias);

        return destDesignTime;
    }


    Microsoft.SqlServer.Dts.Runtime.TaskHost CreateExecuteSQLTask(Executable e, ConnectionManager connectionManager, string taskName, string sqlStatement)
    {
        Microsoft.SqlServer.Dts.Runtime.TaskHost thExecuteSQL = e as Microsoft.SqlServer.Dts.Runtime.TaskHost;
        thExecuteSQL.Properties["Connection"].SetValue(thExecuteSQL, connectionManager.ID);
        thExecuteSQL.Properties["Name"].SetValue(thExecuteSQL, taskName);
        thExecuteSQL.Properties["SqlStatementSource"].SetValue(thExecuteSQL, sqlStatement);

        return thExecuteSQL;
    }


    static void UpdateStagingMetaDataPostProcess(string connectionString, int tableId, int processedItems, string fromMaxTimeStampString)
    {
        string sqlCommand = $"DECLARE @procTime datetime = GETDATE(); " +
                            $"EXEC Staging.meta.spUpdateInsertsUpdatesDeletesAfterProcessing @tableID = {tableId.ToString()}, " +
                                                                                            $"@processedItems = {processedItems.ToString()}, " +
                                                                                            $"@processedTimeStamp = @procTime, " +
                                                                                            $"@fromMaxTimeStamp = '{fromMaxTimeStampString}';";

        using (SqlConnection connection = new SqlConnection(
            connectionString))
        {
            SqlCommand command = new SqlCommand(sqlCommand, connection);
            command.Connection.Open();
            command.ExecuteNonQuery();
        }

    }

    static void ExecuteStagingSQLCommand(string connectionString, string sqlCommand)
    {
        using (SqlConnection connection = new SqlConnection(
            connectionString))
        {
            SqlCommand command = new SqlCommand(sqlCommand, connection);
            command.Connection.Open();
            command.ExecuteNonQuery();
        }
    }


    public override void Input0_ProcessInputRow(Input0Buffer Row)
    {
        //removed a bunch of code to fit into post on stack overflow...

        if (continueProcessing)
        {
            string serverName = this.Variables.ServerName;
            string stagingConnectionString = @"server=" + serverName + imNotGivingYouMyConnectionInformation;
            int processedItems = 0;

            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the deletesExecutable and insertsUpdatesExecutable variables and assign null for {stagingAlias}", string.Empty, 0, ref fireAgain);
            Executable deletesExecutable = null;
            Executable insertsUpdatesExecutable = null;

            //Create the Application and Package
            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the Application and Package", string.Empty, 0, ref fireAgain);
            Microsoft.SqlServer.Dts.Runtime.Application app = new Microsoft.SqlServer.Dts.Runtime.Application();
            Microsoft.SqlServer.Dts.Runtime.Package package = new Microsoft.SqlServer.Dts.Runtime.Package();


            Microsoft.SqlServer.Dts.Runtime.Connections pkgConns = package.Connections;

            //Setup the Source Connection Manager
            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Setup Source Connection Manager for {stagingAlias}", string.Empty, 0, ref fireAgain);
            srcConMgr = CreateConnectionManager(package, connectionDetails,
                                            connectionName + " OLEDB Connection Manager",
                                            "Connection Manager for " + connectionName);
            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Source Connection Manager for {stagingAlias}:  {package.Connections[$"{connectionName} OLEDB Connection Manager"].Description}", string.Empty, 0, ref fireAgain);


            //Setup the Destination Connection Manager
            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Setup Destination Connection Manager", string.Empty, 0, ref fireAgain);
            dstConMgr = CreateConnectionManager(package, destinationConnectionDetails,
                                            "Staging OLEDB Connection Manager",
                                            "Connection Manager for Staging.");
            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Destination Connection Manager for {stagingAlias}:  {package.Connections["Staging OLEDB Connection Manager"].Description}", string.Empty, 0, ref fireAgain);

            ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Connection Manager Count for {stagingAlias}:  {package.Connections.Count}", string.Empty, 0, ref fireAgain);


            //Determine what kind of staging transaction this will be (Trunc and Load or Update/Insert and Delete)
            if (stagingClassification == "S" || this.Variables.ForceTruncateAndLoad == true) //this is a small, trunc and load table OR the user has elected to trunc and load everything that needs trunc'd and loaded yo
            {
                try
                {
                    processedItems = 14; //we're updating inserts, updates, and deletes

                    //we're going to trunc and load here
                    //Create the Load pipeline, a.k.a., DataFlow task
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the Trunc and Load pipeline, a.k.a., DataFlow task for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    Executable tl_e = CreateExecutable(package, "STOCK:PipelineTask");
                    MainPipe tl_dataFlowTask = CreateDataFlowTask(tl_e, "Trunc And Load");

                    //Set the IDTSComponentEvent handler to capture the details from any COMExceptions raised during package execution
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Set the IDTSComponentEvent handler to capture the details from any COMExceptions raised during package execution", string.Empty, 0, ref fireAgain);
                    ComponentEventHandler tl_events = new ComponentEventHandler();
                    tl_dataFlowTask.Events = DtsConvert.GetExtendedInterface(tl_events as IDTSComponentEvents);

                    ReInitializeConnectionManager(srcConMgr, connectionDetails);
                    //Create the OLEDB Source DataFlow Task
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the OLEDB Source DataFlow Task for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    IDTSComponentMetaData100 tl_source = CreateOLEDBComponent(app, tl_dataFlowTask, "OLEDBSource", true);
                    CManagedComponentWrapper tl_srcDesignTime = CreateOLEDBSourceDesignTimeInstance(package, tl_source, srcConMgr, tableSQL);


                    ReInitializeConnectionManager(dstConMgr, destinationConnectionDetails);
                    //Create the OLEDB destination DataFlow Task
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the OLEDB destination DataFlow Task for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    IDTSComponentMetaData100 tl_destination = CreateOLEDBComponent(app, tl_dataFlowTask, "OleDBDestination", false);
                    CManagedComponentWrapper tl_destDesignTime = CreateOLEDBDestinationDesignTimeInstance(package, tl_destination, dstConMgr, $"dbo.{stagingAlias}");


                    //Create the path between the two DataFlow Tasks
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the path between the two DataFlow Tasks for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    IDTSPath100 tl_path = tl_dataFlowTask.PathCollection.New();
                    tl_path.AttachPathAndPropagateNotifications(tl_source.OutputCollection[0], tl_destination.InputCollection[0]);


                    //Configure the Destination's Meta Data
                    //############################################################
                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //Get the destination's default input and virtual input
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Get the destination's default input and virtual input", string.Empty, 0, ref fireAgain);
                    IDTSInput100 tl_input = tl_destination.InputCollection[0];
                    IDTSVirtualInput100 tl_vInput = tl_input.GetVirtualInput();
                    IDTSVirtualInputColumnCollection100 tl_vInputColumns = tl_vInput.VirtualInputColumnCollection;

                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //Initialize the destination dataflow
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Initialize the destination dataflow", string.Empty, 0, ref fireAgain);
                    tl_destDesignTime.AcquireConnections(null);
                    tl_destDesignTime.ReinitializeMetaData();
                    tl_destDesignTime.ReleaseConnections();

                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
                    //Iterate through the virtual input column collection and map to destination
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Iterate through the virtual input column collection and map to destination for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    foreach (IDTSVirtualInputColumn100 tl_vColumn in tl_vInputColumns)
                    {
                        var inputColumn = tl_destDesignTime.SetUsageType(tl_input.ID, tl_vInput, tl_vColumn.LineageID, DTSUsageType.UT_READONLY);
                        var externalColumn = tl_input.ExternalMetadataColumnCollection[inputColumn.Name];
                        tl_destDesignTime.MapInputColumn(tl_input.ID, inputColumn.ID, externalColumn.ID);
                    }
                    //############################################################


                    //Create the Truncate Execute SQL Task
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the Truncation Execute SQL Task for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    Executable trunc_e = CreateExecutable(package, "STOCK:SQLTask");
                    Microsoft.SqlServer.Dts.Runtime.TaskHost thTruncate = CreateExecuteSQLTask(trunc_e, dstConMgr, $"TRUNCATE {stagingAlias}", $"TRUNCATE TABLE dbo.{stagingAlias}");

                    //Create the Precedence Constraint between the Execute SQL Task and the Pipeline Task
                    ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}:  Create the precedence constraint between Execute SQL Task and DataFlow for {stagingAlias}", string.Empty, 0, ref fireAgain);
                    Microsoft.SqlServer.Dts.Runtime.PrecedenceConstraint tl_Constraint = package.PrecedenceConstraints.Add(trunc_e, tl_e);
                }
                catch (Exception tl_exc)
                {

                    ComponentMetaData.FireWarning(0, "Trunc And Load Package Creation Failure", $"{pathName}:  Trunc and Load Package Creation Failure for {stagingAlias} Custom Component Event Type:  {CustomComponentEvent.type}, Sub Component:  {CustomComponentEvent.subComponent}, Description:  {CustomComponentEvent.description}", string.Empty, 0);
                    ComponentMetaData.FireWarning(0, "Trunc And Load Package Creation Failure", $"{pathName}:  Trunc and Load Package Creation Failure for {stagingAlias} Error Code:  {tl_exc.HResult}, Error Message:  {tl_exc.Message}, Source Table SQL:  {tableSQL}, Source Connection Details:  {srcConMgr.ConnectionString}", string.Empty, 0);
                    continueProcessing = false;
                }


            }
            else
            {
                //removed a bunch of code for pasting into stack overlfow

Большое спасибо за вашу помощь!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...