В примере программы имеется следующий пакетный блок: new BatchBlock<int>(10, new GroupingDataflowBlockOptions { MaxNumberOfGroups = 2 });
, которому отправляется 60 элементов данных int и используется отдельная задача.
Проблема заключается в том, что await sourceBlock.SendAsync(i);
непохоже, ожидает, даже несмотря на то, что BatchBlock ограничил емкость, данные по-прежнему непрерывно отправляются, не занимая задачи по удалению каких-либо элементов.В конечном итоге BatchBlock получает только 2 пакета из 10 элементов данных int.Я ожидаю, что await sourceBlock.SendAsync(i);
приостановит выполнение при отправке 20 элементов, поскольку ограниченная емкость блока установлена равной 10 с максимумом из 2 групп.Затем в какой-то момент задача потребителя получит данные, и процесс повторится.
Я прикрепил приведенный ниже код, создаю простое консольное приложение, добавив следующее к основному:
BatchBlockIssueReplication().GetAwaiter().GetResult();
Метод для вызова:
public static async Task BatchBlockIssueReplication()
{
var sourceBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions { MaxNumberOfGroups = 2 });
// Reading data from the source block
Task fireAndForget = Task.Run(async () =>
{
while (!sourceBlock.Completion.IsCanceled)
{
await Task.Delay(1500);
if (await sourceBlock.OutputAvailableAsync() && sourceBlock.TryReceiveAll(out var results))
{
Console.WriteLine("Received: ");
foreach (var result in results)
{
Console.Write($"{result.Length} ");
}
Console.WriteLine();
}
}
});
for (int i = 0; i < 60; i++)
{
Console.WriteLine($"Sending {i} to the source block");
await sourceBlock.SendAsync(i);
}
Console.WriteLine("Finished sending data to the source block");
await Task.Delay(10000);
}