Недавно мне было поручено создать автоматизированный процесс ETL, который закачивает данные в таблицы на основе плоского имени файла путем чтения файла основного сопоставления. Я решил пойти с SqlBulkCopy, и все, казалось, было хорошо. Интерфейс IDataReader был реализован для чтения плоских файлов, метаданных SQL Server, снабженных количеством столбцов для сопоставления данных «один к одному», все работало, пока я не наткнулся на файл с пустыми строками. SqlBulkCopy генерирует исключение, говорящее, что «данное значение типа String из источника данных не может быть преобразовано в тип int указанного целевого столбца.». Конец истории, это даже не заботится о том, что тип БД для этого столбца INT NULL. Я знаю, что могу дальше интерпретировать метаданные, извлекать типы данных для заданных столбцов, строить DataSet на основе извлеченной информации, повторно преобразовывать данные из плоских файлов и получать себе хорошее строго типизированное решение, которое будет работать, но я ленивый парень, который чувствует, что его счастье было жестоко разорвано Microsoft или моей собственной некомпетентностью, если кто-то знает о решении моей внезапной проблемы. Спасибо за ваше время.
List<String> fileNames;
DateTime startJobTime = DateTime.Now;
Console.WriteLine("---------------------------------------------");
Console.WriteLine("Start Time: " + startJobTime);
Console.WriteLine("---------------------------------------------");
using (SqlConnection sqlCon = new SqlConnection(sqlConnection))
{
try
{
sqlCon.Open();
sqlCon.ChangeDatabase(edwDBName);
// Get service information for staging job
UnivStage us = GetStagingJobInfo(jobName, sqlCon);
us.StartJobTime = startJobTime;
// Get a list of file names
fileNames = GetFileList(us, args);
if (fileNames.Count > 0)
{
// Truncate Staging Table
TruncateStagingTable(us, sqlCon);
// Close and dispose of sqlCon2 connection
sqlCon.Close();
Console.WriteLine("Processing files: ");
foreach (String fileName in fileNames)
Console.WriteLine(fileName);
Console.WriteLine();
}
else
{
Console.WriteLine("No files to process.");
Environment.Exit(0);
}
// Re-open Sql Connection
sqlCon.Open();
sqlCon.ChangeDatabase(stagingDBName);
foreach (String filePath in fileNames)
{
using (SqlTransaction sqlTran = sqlCon.BeginTransaction())
{
using (FlatFileReader ffReader = new FlatFileReader(filePath, us.Delimiter))
{
using (SqlBulkCopy sqlBulkCopy =
new SqlBulkCopy(sqlCon, SqlBulkCopyOptions.Default, sqlTran))
{
SqlConnection sqlCon2 = new SqlConnection(sqlConnection);
SetColumnList(sqlCon2, us, sqlBulkCopy);
sqlBulkCopy.BatchSize = 1000;
sqlBulkCopy.DestinationTableName =
us.StagingSchemaName + "." + us.StagingTableName;
sqlBulkCopy.WriteToServer(ffReader);
sqlTran.Commit();
sqlCon2.Close();
}
}
}
}
sqlCon.ChangeDatabase(edwDBName);
sqlCon.Close();
sqlCon.Open();
SetRowCount(us, sqlCon);
sqlCon.Close();
us.EndJobTime = DateTime.Now;
sqlCon.Open();
LogStagingProcess(us, sqlCon);
sqlCon.Close();
Console.WriteLine(us.ProcessedRowCount + " rows inserted.");
Console.WriteLine("---------------------------------------------");
Console.WriteLine("Success! End Time: " + us.EndJobTime);
Console.WriteLine("---------------------------------------------");
Console.ReadLine();
}
catch (SqlException e)
{
RenderExceptionMessagesAndExit(e,
"Exception have occured during an attempt to utilize SqlBulkCopy\n");
}
}