Я пытаюсь понять, как BatchBlocks
и ActionBlocks
работают в .net DataFlow
У меня есть ActionBlock
Давайте назовем его A1, который подается из цикла, A1 будет извлекать некоторые записи из API в асинхронном режиме. Как только результаты возвращаются, он отправляет результаты обратно в «BatchBlock» (B1) с размером пакета 5, который объединяет результаты и передает их в другое действие (A2)
Теперь я ожидаю, что как только А1 получит 5 результатов, А2 должен быть вызван с результатом [4]. Это происходит, но не сразу, между тем, как А1 отправляет 5 товаров в В1, а затем В1 вызывает А2.
не уверен, почему так много ожидания.
A1--
var i = 0;
action = new ActionBlock<D1>(async d1 =>
{
var results = await QueryClientAsync(d1);
if (results != null)
{
await outgoingQueue.SendAsync(new Tuple<D1, IEnumerable<ScannedRecord>>(d1, GetRecordsFromQuery(d1.Name, results)));
Console.WriteLine($"Received {i++} {d1.Name} {DateTime.Now}");
}
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = MaxDegreeOfParallellismForQuery });
, где OutgoingQueue
-
outgoingQueue = new BatchBlock<Tuple<D1, IEnumerable<ScannedRecord>>>(BatchSizeForOutgoingCollection); // batch size = 5
и outgoingQueue связан вот так с A2
-
var actionToProcessIncomingItems = new ActionBlock<Tuple<D1, IEnumerable<ScannedRecord>>[]>(
ds =>
{
Console.WriteLine($"Batch Recieved at {DateTime.Now}!");
ProcessCommonItems(ds);
}
, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = MaxDegreeOfParalleismToProcessRecievedItemsForScanner });
outgoingQueue.LinkTo(actionToProcessIncomingItems, new DataflowLinkOptions { PropagateCompletion = true });