Итак ... У меня есть пакет SSIS, который создает и запускает пакеты SSIS в памяти. Создание "мета" пакета происходит внутри компонента сценария DataFlow, который настроен как место назначения. Основная c суть состоит в том, чтобы извлечь данные из одного или нескольких Oracle источников и поместить их в SQL серверную промежуточную базу данных (используется для бизнес-аналитики). У меня был большой успех с пакетом, за исключением случаев, когда я пытаюсь получить данные из источника Oracle с помощью DB Link (также известного как SCHEMA. TABLE_NAME@OTHER).
Диспетчер подключений Oracle создан не иначе, как если бы я не использовал ссылку на БД. Оператор Select, который я использую (содержащий синтаксис DB Link), создается задолго до того, как я запускаю пакет SSIS.
Когда я пытаюсь запустить «мета-пакет», содержащий один из этих операторов select, я получаю ошибки, связанные с использованием синтаксиса, который OLE DB SQL Server не распознает. Это означает, что вместо использования Oracle моего диспетчера исходных подключений он использует либо мой SQL диспетчер подключений к серверу, либо забывает, что это диспетчер подключений Oracle.
Опять же, ЕДИНСТВЕННАЯ разница - это синтаксис DB Link в операторе SELECT. Это буквально это.
Я попытался изменить свой код так, чтобы диспетчеры соединений создавались непосредственно перед тем, как они понадобятся. Но это не решает проблемы (просто вводит больше). Я пробовал использовать диспетчеры соединений из базового внешнего пакета. Но и здесь есть свои проблемы. Я даже попытался «повторно инициализировать» диспетчеры соединений прямо перед тем, как они понадобятся, что, похоже, ничего не делает.
Я использую SSIS 2017. Базы данных Oracle различаются по версиям, но все они 11 и выше.
Кто-нибудь когда-нибудь сталкивался с этим, и если да, то как вы это исправляли?
Я включаю часть своего кода ниже; Мне пришлось обрезать все, чтобы поместиться в этот пост. table и определяет, нужно ли просто усечь и загрузить; секция, которая (как только она определена как trun c и загружается) определяет, требуется ли сравнение удалений; и раздел, который определяет, нужно ли обрабатывать обновления / вставки. Я включил секцию Truncate и Load практически целиком, которая демонстрирует logi c практически повсюду.
#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using Microsoft.SqlServer.Dts.Pipeline;
using System.Data.SqlClient;
#endregion
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
private bool fireAgain = true;
private bool continueProcessing = true;
private string processingErrorMessage;
private string pathName;
private ConnectionManager srcConMgr;
private ConnectionManager dstConMgr;
public override void PreExecute()
{
base.PreExecute();
ComponentMetaData.FireInformation(10, $"Begining Pre Execute of Threaded Path", "Begining Pre Execute of First Threaded Path", "", 0, fireAgain);
}
public override void PostExecute()
{
base.PostExecute();
if (continueProcessing)
{
ComponentMetaData.FireInformation(10, $"Begining Post Execute of {pathName}", $"Begining Post Execute of {pathName}", "", 0, fireAgain);
}
else
{
ComponentMetaData.FireInformation(10, $"Begining Post Execute of {pathName}", $"There were Processing Errors along {pathName}: {processingErrorMessage}. Table was skipped for processing.", "", 0, fireAgain);
}
}
string BlobColumnToString(BlobColumn blobColumn)
{
if (blobColumn.IsNull)
return string.Empty;
var blobLength = Convert.ToInt32(blobColumn.Length);
var blobData = blobColumn.GetBlobData(0, blobLength);
var stringData = System.Text.Encoding.Unicode.GetString(blobData);
return stringData;
}
ConnectionManager CreateConnectionManager(Microsoft.SqlServer.Dts.Runtime.Package package, string conString, string conName, string conDescription)
{
ConnectionManager bldConnectionManager = package.Connections.Add("OLEDB");
bldConnectionManager.ConnectionString = conString;
bldConnectionManager.Name = conName;
bldConnectionManager.Description = conDescription;
bldConnectionManager.AcquireConnection(null);
return bldConnectionManager;
}
static void ReInitializeConnectionManager(ConnectionManager conMgr, string conString)
{
conMgr.ConnectionString = conString;
conMgr.AcquireConnection(null);
}
Executable CreateExecutable(Microsoft.SqlServer.Dts.Runtime.Package package, string exType)
{
Executable e = package.Executables.Add(exType);
return e;
}
MainPipe CreateDataFlowTask(Executable executable, string dfName)
{
Microsoft.SqlServer.Dts.Runtime.TaskHost thMainPipe = executable as Microsoft.SqlServer.Dts.Runtime.TaskHost;
thMainPipe.Name = dfName;
MainPipe dataFlowTask = thMainPipe.InnerObject as MainPipe;
(dataFlowTask as IDTSPipeline130).AutoAdjustBufferSize = true;
return dataFlowTask;
}
IDTSComponentMetaData100 CreateOLEDBComponent(Microsoft.SqlServer.Dts.Runtime.Application app, MainPipe dataFlowTask, string componentName, bool createSource)
{
//Create the DataFlow Task
IDTSComponentMetaData100 oleComponent = dataFlowTask.ComponentMetaDataCollection.New();
oleComponent.Name = componentName;
if (createSource == true)
{
oleComponent.ComponentClassID = app.PipelineComponentInfos["OLE DB Source"].CreationName;
}
else
{
oleComponent.ComponentClassID = app.PipelineComponentInfos["OLE DB Destination"].CreationName;
}
return oleComponent;
}
CManagedComponentWrapper CreateOLEDBSourceDesignTimeInstance(Microsoft.SqlServer.Dts.Runtime.Package package, IDTSComponentMetaData100 source, ConnectionManager sourceConnectionManager, string sourceSQL)
{
//Get the design-time instance of the component.
CManagedComponentWrapper srcDesignTime = source.Instantiate();
//Initialize the component
srcDesignTime.ProvideComponentProperties();
//Map the component to a connection manager
source.RuntimeConnectionCollection[0].ConnectionManagerID = sourceConnectionManager.ID;
source.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(package.Connections[0]);
//Set the OLE DB Source properties
srcDesignTime.SetComponentProperty("AccessMode", 2);
srcDesignTime.SetComponentProperty("SqlCommand", sourceSQL);
// Reinitialize the metadata
srcDesignTime.AcquireConnections(null);
srcDesignTime.ReinitializeMetaData();
srcDesignTime.ReleaseConnections();
return srcDesignTime;
}
CManagedComponentWrapper CreateOLEDBDestinationDesignTimeInstance(Microsoft.SqlServer.Dts.Runtime.Package package, IDTSComponentMetaData100 destination, ConnectionManager destinationConnectionManager, string stagingAlias)
{
CManagedComponentWrapper destDesignTime = destination.Instantiate();
destDesignTime.ProvideComponentProperties();
destination.RuntimeConnectionCollection[0].ConnectionManagerID = destinationConnectionManager.ID;
destination.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(package.Connections[1]);
destDesignTime.SetComponentProperty("AccessMode", 3);
destDesignTime.SetComponentProperty("FastLoadOptions", "TABLOCK");
destDesignTime.SetComponentProperty("OpenRowset", stagingAlias);
return destDesignTime;
}
Microsoft.SqlServer.Dts.Runtime.TaskHost CreateExecuteSQLTask(Executable e, ConnectionManager connectionManager, string taskName, string sqlStatement)
{
Microsoft.SqlServer.Dts.Runtime.TaskHost thExecuteSQL = e as Microsoft.SqlServer.Dts.Runtime.TaskHost;
thExecuteSQL.Properties["Connection"].SetValue(thExecuteSQL, connectionManager.ID);
thExecuteSQL.Properties["Name"].SetValue(thExecuteSQL, taskName);
thExecuteSQL.Properties["SqlStatementSource"].SetValue(thExecuteSQL, sqlStatement);
return thExecuteSQL;
}
static void UpdateStagingMetaDataPostProcess(string connectionString, int tableId, int processedItems, string fromMaxTimeStampString)
{
string sqlCommand = $"DECLARE @procTime datetime = GETDATE(); " +
$"EXEC Staging.meta.spUpdateInsertsUpdatesDeletesAfterProcessing @tableID = {tableId.ToString()}, " +
$"@processedItems = {processedItems.ToString()}, " +
$"@processedTimeStamp = @procTime, " +
$"@fromMaxTimeStamp = '{fromMaxTimeStampString}';";
using (SqlConnection connection = new SqlConnection(
connectionString))
{
SqlCommand command = new SqlCommand(sqlCommand, connection);
command.Connection.Open();
command.ExecuteNonQuery();
}
}
static void ExecuteStagingSQLCommand(string connectionString, string sqlCommand)
{
using (SqlConnection connection = new SqlConnection(
connectionString))
{
SqlCommand command = new SqlCommand(sqlCommand, connection);
command.Connection.Open();
command.ExecuteNonQuery();
}
}
public override void Input0_ProcessInputRow(Input0Buffer Row)
{
//removed a bunch of code to fit into post on stack overflow...
if (continueProcessing)
{
string serverName = this.Variables.ServerName;
string stagingConnectionString = @"server=" + serverName + imNotGivingYouMyConnectionInformation;
int processedItems = 0;
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Create the deletesExecutable and insertsUpdatesExecutable variables and assign null for {stagingAlias}", string.Empty, 0, ref fireAgain);
Executable deletesExecutable = null;
Executable insertsUpdatesExecutable = null;
//Create the Application and Package
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Create the Application and Package", string.Empty, 0, ref fireAgain);
Microsoft.SqlServer.Dts.Runtime.Application app = new Microsoft.SqlServer.Dts.Runtime.Application();
Microsoft.SqlServer.Dts.Runtime.Package package = new Microsoft.SqlServer.Dts.Runtime.Package();
Microsoft.SqlServer.Dts.Runtime.Connections pkgConns = package.Connections;
//Setup the Source Connection Manager
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Setup Source Connection Manager for {stagingAlias}", string.Empty, 0, ref fireAgain);
srcConMgr = CreateConnectionManager(package, connectionDetails,
connectionName + " OLEDB Connection Manager",
"Connection Manager for " + connectionName);
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Source Connection Manager for {stagingAlias}: {package.Connections[$"{connectionName} OLEDB Connection Manager"].Description}", string.Empty, 0, ref fireAgain);
//Setup the Destination Connection Manager
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Setup Destination Connection Manager", string.Empty, 0, ref fireAgain);
dstConMgr = CreateConnectionManager(package, destinationConnectionDetails,
"Staging OLEDB Connection Manager",
"Connection Manager for Staging.");
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Destination Connection Manager for {stagingAlias}: {package.Connections["Staging OLEDB Connection Manager"].Description}", string.Empty, 0, ref fireAgain);
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Connection Manager Count for {stagingAlias}: {package.Connections.Count}", string.Empty, 0, ref fireAgain);
//Determine what kind of staging transaction this will be (Trunc and Load or Update/Insert and Delete)
if (stagingClassification == "S" || this.Variables.ForceTruncateAndLoad == true) //this is a small, trunc and load table OR the user has elected to trunc and load everything that needs trunc'd and loaded yo
{
try
{
processedItems = 14; //we're updating inserts, updates, and deletes
//we're going to trunc and load here
//Create the Load pipeline, a.k.a., DataFlow task
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Create the Trunc and Load pipeline, a.k.a., DataFlow task for {stagingAlias}", string.Empty, 0, ref fireAgain);
Executable tl_e = CreateExecutable(package, "STOCK:PipelineTask");
MainPipe tl_dataFlowTask = CreateDataFlowTask(tl_e, "Trunc And Load");
//Set the IDTSComponentEvent handler to capture the details from any COMExceptions raised during package execution
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Set the IDTSComponentEvent handler to capture the details from any COMExceptions raised during package execution", string.Empty, 0, ref fireAgain);
ComponentEventHandler tl_events = new ComponentEventHandler();
tl_dataFlowTask.Events = DtsConvert.GetExtendedInterface(tl_events as IDTSComponentEvents);
ReInitializeConnectionManager(srcConMgr, connectionDetails);
//Create the OLEDB Source DataFlow Task
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Create the OLEDB Source DataFlow Task for {stagingAlias}", string.Empty, 0, ref fireAgain);
IDTSComponentMetaData100 tl_source = CreateOLEDBComponent(app, tl_dataFlowTask, "OLEDBSource", true);
CManagedComponentWrapper tl_srcDesignTime = CreateOLEDBSourceDesignTimeInstance(package, tl_source, srcConMgr, tableSQL);
ReInitializeConnectionManager(dstConMgr, destinationConnectionDetails);
//Create the OLEDB destination DataFlow Task
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Create the OLEDB destination DataFlow Task for {stagingAlias}", string.Empty, 0, ref fireAgain);
IDTSComponentMetaData100 tl_destination = CreateOLEDBComponent(app, tl_dataFlowTask, "OleDBDestination", false);
CManagedComponentWrapper tl_destDesignTime = CreateOLEDBDestinationDesignTimeInstance(package, tl_destination, dstConMgr, $"dbo.{stagingAlias}");
//Create the path between the two DataFlow Tasks
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Create the path between the two DataFlow Tasks for {stagingAlias}", string.Empty, 0, ref fireAgain);
IDTSPath100 tl_path = tl_dataFlowTask.PathCollection.New();
tl_path.AttachPathAndPropagateNotifications(tl_source.OutputCollection[0], tl_destination.InputCollection[0]);
//Configure the Destination's Meta Data
//############################################################
//>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
//>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
//Get the destination's default input and virtual input
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Get the destination's default input and virtual input", string.Empty, 0, ref fireAgain);
IDTSInput100 tl_input = tl_destination.InputCollection[0];
IDTSVirtualInput100 tl_vInput = tl_input.GetVirtualInput();
IDTSVirtualInputColumnCollection100 tl_vInputColumns = tl_vInput.VirtualInputColumnCollection;
//>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
//>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
//Initialize the destination dataflow
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Initialize the destination dataflow", string.Empty, 0, ref fireAgain);
tl_destDesignTime.AcquireConnections(null);
tl_destDesignTime.ReinitializeMetaData();
tl_destDesignTime.ReleaseConnections();
//>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
//>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
//Iterate through the virtual input column collection and map to destination
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Iterate through the virtual input column collection and map to destination for {stagingAlias}", string.Empty, 0, ref fireAgain);
foreach (IDTSVirtualInputColumn100 tl_vColumn in tl_vInputColumns)
{
var inputColumn = tl_destDesignTime.SetUsageType(tl_input.ID, tl_vInput, tl_vColumn.LineageID, DTSUsageType.UT_READONLY);
var externalColumn = tl_input.ExternalMetadataColumnCollection[inputColumn.Name];
tl_destDesignTime.MapInputColumn(tl_input.ID, inputColumn.ID, externalColumn.ID);
}
//############################################################
//Create the Truncate Execute SQL Task
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Create the Truncation Execute SQL Task for {stagingAlias}", string.Empty, 0, ref fireAgain);
Executable trunc_e = CreateExecutable(package, "STOCK:SQLTask");
Microsoft.SqlServer.Dts.Runtime.TaskHost thTruncate = CreateExecuteSQLTask(trunc_e, dstConMgr, $"TRUNCATE {stagingAlias}", $"TRUNCATE TABLE dbo.{stagingAlias}");
//Create the Precedence Constraint between the Execute SQL Task and the Pipeline Task
ComponentMetaData.FireInformation(0, "Progress Status", $"{pathName}: Create the precedence constraint between Execute SQL Task and DataFlow for {stagingAlias}", string.Empty, 0, ref fireAgain);
Microsoft.SqlServer.Dts.Runtime.PrecedenceConstraint tl_Constraint = package.PrecedenceConstraints.Add(trunc_e, tl_e);
}
catch (Exception tl_exc)
{
ComponentMetaData.FireWarning(0, "Trunc And Load Package Creation Failure", $"{pathName}: Trunc and Load Package Creation Failure for {stagingAlias} Custom Component Event Type: {CustomComponentEvent.type}, Sub Component: {CustomComponentEvent.subComponent}, Description: {CustomComponentEvent.description}", string.Empty, 0);
ComponentMetaData.FireWarning(0, "Trunc And Load Package Creation Failure", $"{pathName}: Trunc and Load Package Creation Failure for {stagingAlias} Error Code: {tl_exc.HResult}, Error Message: {tl_exc.Message}, Source Table SQL: {tableSQL}, Source Connection Details: {srcConMgr.ConnectionString}", string.Empty, 0);
continueProcessing = false;
}
}
else
{
//removed a bunch of code for pasting into stack overlfow
Большое спасибо за вашу помощь!