Неожиданное окончание конвейера потока данных TPL - PullRequest
0 голосов
/ 24 октября 2019

Я новичок в TPL Dataflow, это мой первый конвейер.

Я перевожу 1M строк до EmpresasBuffer. В пакетах по 1000 EmpresasBuffer отправляет данные на SaveBlock, что сохраняет их в базу данных. После сохранения ExitBlock показывает, сколько строк было обработано до сих пор.

Это работает с меньшим количеством строк. Если я попытаюсь нажать 10 000 строк, он будет работать, как и ожидалось, но с 1 М строк, он молча завершится неудачей.

Проверка кода Я вижу, что SaveBlock и ExitBlock установлены как Завершено ,но это происходит до того, как EmpresasBuffer.Complete() был вызван.

Это код, который создает конвейер:

var EmpresasBuffer = new BatchBlock<EmpresaModel>(1000, 
                        new GroupingDataflowBlockOptions { 
                               EnsureOrdered = false, 
                               BoundedCapacity = 10000 });
var SaveBlock = new TransformBlock<EmpresaModel[], int>(
                        async x => await SalvarEmpresaAsync(x), 
                        new ExecutionDataflowBlockOptions { 
                               EnsureOrdered = false, 
                               MaxDegreeOfParallelism = 32, 
                               BoundedCapacity = 10000 });
car ExitBlock = new ActionBlock<int>(x => {
    Console.WriteLine($"[{DateTime.Now.ToString("dd/MM/yyyy HH:mm:ss")}] Registros Importados: {TotalRows += x}");
});

EmpresasBuffer.LinkTo(SaveBlock, new DataflowLinkOptions { PropagateCompletion = true });
SaveBlock.LinkTo(ExitBlock, new DataflowLinkOptions { PropagateCompletion = true });

И этот код я использую для передачи данных в EmpresasBuffer:

using (var reader = new StreamReader(stream))
{
    string linha;
    while ((linha = reader.ReadLine()) != null)
    {
        //{...creates empresa class...}
        await EmpresasBuffer.SendAsync(empresa);
    }
}

//{...after the end of loop...}

EmpresasBuffer.Complete();

await ExitBlock.Completion;

Console.WriteLine($"[{DateTime.Now.ToString("dd/MM/yyyy HH:mm:ss")}] Process finished!");

Что не так с моим кодом?

Это код SalvarEmpresaAsync:

public static async Task<int> SalvarEmpresaAsync(EmpresaModel[] modelArray)
{
    using (var Connection = new SqlConnection(ConnectionString))
    {
        Connection.Open();

        var TaskEmpresa = BulkInsert<EmpresaModel>(Connection, "Empresas", modelArray, PropertiesEmpresa);
        var TaskQuadroSocietario = BulkInsert<QuadroSocietarioModel>(Connection, "QuadroSocietario", modelArray.SelectMany(x => x.QuadrosSocietarios).AsEnumerable(), PropertiesQuadroSocietario);

        await TaskEmpresa;
        await TaskQuadroSocietario;
    }


    return modelArray.Length;
}


public static async Task BulkInsert<T>(SqlConnection connection, string tableName, IEnumerable<T> data, string[] columns) where T: class
{
     using (var BulkCopy = new SqlBulkCopy(connection))
     using (var reader = ObjectReader.Create(data, columns))
     {
         foreach (string property in columns)
         {
             BulkCopy.ColumnMappings.Add(new SqlBulkCopyColumnMapping(property, property));
         }

         BulkCopy.DestinationTableName = tableName;
         await BulkCopy.WriteToServerAsync(reader);
     }
 }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...