Я новичок в TPL Dataflow, это мой первый конвейер.
Я перевожу 1M строк до EmpresasBuffer
. В пакетах по 1000 EmpresasBuffer
отправляет данные на SaveBlock
, что сохраняет их в базу данных. После сохранения ExitBlock
показывает, сколько строк было обработано до сих пор.
Это работает с меньшим количеством строк. Если я попытаюсь нажать 10 000 строк, он будет работать, как и ожидалось, но с 1 М строк, он молча завершится неудачей.
Проверка кода Я вижу, что SaveBlock
и ExitBlock
установлены как Завершено ,но это происходит до того, как EmpresasBuffer.Complete()
был вызван.
Это код, который создает конвейер:
var EmpresasBuffer = new BatchBlock<EmpresaModel>(1000,
new GroupingDataflowBlockOptions {
EnsureOrdered = false,
BoundedCapacity = 10000 });
var SaveBlock = new TransformBlock<EmpresaModel[], int>(
async x => await SalvarEmpresaAsync(x),
new ExecutionDataflowBlockOptions {
EnsureOrdered = false,
MaxDegreeOfParallelism = 32,
BoundedCapacity = 10000 });
car ExitBlock = new ActionBlock<int>(x => {
Console.WriteLine($"[{DateTime.Now.ToString("dd/MM/yyyy HH:mm:ss")}] Registros Importados: {TotalRows += x}");
});
EmpresasBuffer.LinkTo(SaveBlock, new DataflowLinkOptions { PropagateCompletion = true });
SaveBlock.LinkTo(ExitBlock, new DataflowLinkOptions { PropagateCompletion = true });
И этот код я использую для передачи данных в EmpresasBuffer
:
using (var reader = new StreamReader(stream))
{
string linha;
while ((linha = reader.ReadLine()) != null)
{
//{...creates empresa class...}
await EmpresasBuffer.SendAsync(empresa);
}
}
//{...after the end of loop...}
EmpresasBuffer.Complete();
await ExitBlock.Completion;
Console.WriteLine($"[{DateTime.Now.ToString("dd/MM/yyyy HH:mm:ss")}] Process finished!");
Что не так с моим кодом?
Это код SalvarEmpresaAsync
:
public static async Task<int> SalvarEmpresaAsync(EmpresaModel[] modelArray)
{
using (var Connection = new SqlConnection(ConnectionString))
{
Connection.Open();
var TaskEmpresa = BulkInsert<EmpresaModel>(Connection, "Empresas", modelArray, PropertiesEmpresa);
var TaskQuadroSocietario = BulkInsert<QuadroSocietarioModel>(Connection, "QuadroSocietario", modelArray.SelectMany(x => x.QuadrosSocietarios).AsEnumerable(), PropertiesQuadroSocietario);
await TaskEmpresa;
await TaskQuadroSocietario;
}
return modelArray.Length;
}
public static async Task BulkInsert<T>(SqlConnection connection, string tableName, IEnumerable<T> data, string[] columns) where T: class
{
using (var BulkCopy = new SqlBulkCopy(connection))
using (var reader = ObjectReader.Create(data, columns))
{
foreach (string property in columns)
{
BulkCopy.ColumnMappings.Add(new SqlBulkCopyColumnMapping(property, property));
}
BulkCopy.DestinationTableName = tableName;
await BulkCopy.WriteToServerAsync(reader);
}
}