await SendAsync не ожидает TPL Dataflow BatchBlock - PullRequest
0 голосов
/ 08 сентября 2018

В примере программы имеется следующий пакетный блок: new BatchBlock<int>(10, new GroupingDataflowBlockOptions { MaxNumberOfGroups = 2 });, которому отправляется 60 элементов данных int и используется отдельная задача.

Проблема заключается в том, что await sourceBlock.SendAsync(i); непохоже, ожидает, даже несмотря на то, что BatchBlock ограничил емкость, данные по-прежнему непрерывно отправляются, не занимая задачи по удалению каких-либо элементов.В конечном итоге BatchBlock получает только 2 пакета из 10 элементов данных int.Я ожидаю, что await sourceBlock.SendAsync(i); приостановит выполнение при отправке 20 элементов, поскольку ограниченная емкость блока установлена ​​равной 10 с максимумом из 2 групп.Затем в какой-то момент задача потребителя получит данные, и процесс повторится.

Я прикрепил приведенный ниже код, создаю простое консольное приложение, добавив следующее к основному:

BatchBlockIssueReplication().GetAwaiter().GetResult();

Метод для вызова:

    public static async Task BatchBlockIssueReplication()
    {
        var sourceBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions { MaxNumberOfGroups = 2 });

        // Reading data from the source block
        Task fireAndForget = Task.Run(async () =>
        {
            while (!sourceBlock.Completion.IsCanceled)
            {
                await Task.Delay(1500);
                if (await sourceBlock.OutputAvailableAsync() && sourceBlock.TryReceiveAll(out var results))
                {
                    Console.WriteLine("Received: ");
                    foreach (var result in results)
                    {
                        Console.Write($"{result.Length} ");
                    }
                    Console.WriteLine();
                }
            }
        });

        for (int i = 0; i < 60; i++)
        {
            Console.WriteLine($"Sending {i} to the source block");
            await sourceBlock.SendAsync(i);
        }
        Console.WriteLine("Finished sending data to the source block");

        await Task.Delay(10000);
    }

Ответы [ 2 ]

0 голосов
/ 12 сентября 2018

Вы не установили BoundedCapacity , который контролирует, сколько элементов может ожидать в буфере input . Превышение этого приведет к SendAsync ожиданию.

Вы устанавливаете свойство MaxNumberOfGroups , которое указывает, сколько групп будет сгенерировано этим блоком, прежде чем отказаться от получения каких-либо других входных данных.

Из документов:

Получает или задает максимальное количество групп, которые должны быть сгенерированы блоком.

Если вы хотите, чтобы ваш блок сохранял, например, 20 блоков во входном буфере и ожидал, вы должны установить BoundedCapacity:

var sourceBlock = new BatchBlock<int>(10, new GroupingDataflowBlockOptions 
                                          { 
                                              BoundedCapacity = 20 
                                          });
0 голосов
/ 11 сентября 2018

После достижения максимума await sourceBlock.SendAsync(i); не собирается останавливаться, так как блок активно отказывается от большего количества предметов. Когда это происходит, SendAsync возвращает false, указывая, что блок не будет принимать новые сообщения. Если вы напишите результат вызова SendAsync, вы увидите, где блок прекращает принимать новые сообщения:

Sending 0 to the source block
True
Sending 1 to the source block
True
Sending 2 to the source block
True
Sending 3 to the source block
True
Sending 4 to the source block
True
Sending 5 to the source block
True
Sending 6 to the source block
True
Sending 7 to the source block
True
Sending 8 to the source block
True
Sending 9 to the source block
True
Sending 10 to the source block
True
Sending 11 to the source block
True
Sending 12 to the source block
True
Sending 13 to the source block
True
Sending 14 to the source block
True
Sending 15 to the source block
True
Sending 16 to the source block
True
Sending 17 to the source block
True
Sending 18 to the source block
True
Sending 19 to the source block
True
Sending 20 to the source block
False
Sending 21 to the source block
False
Sending 22 to the source block
False
Sending 23 to the source block
False
Sending 24 to the source block
False
Sending 25 to the source block
False
Sending 26 to the source block
False
Sending 27 to the source block
False
Sending 28 to the source block
False
Sending 29 to the source block
False
Sending 30 to the source block
False
Sending 31 to the source block
False
Sending 32 to the source block
False
Sending 33 to the source block
False
Sending 34 to the source block
False
Sending 35 to the source block
False
Sending 36 to the source block
False
Sending 37 to the source block
False
Sending 38 to the source block
False
Sending 39 to the source block
False
Sending 40 to the source block
False
Sending 41 to the source block
False
Sending 42 to the source block
False
Sending 43 to the source block
False
Sending 44 to the source block
False
Sending 45 to the source block
False
Sending 46 to the source block
False
Sending 47 to the source block
False
Sending 48 to the source block
False
Sending 49 to the source block
False
Sending 50 to the source block
False
Sending 51 to the source block
False
Sending 52 to the source block
False
Sending 53 to the source block
False
Sending 54 to the source block
False
Sending 55 to the source block
False
Sending 56 to the source block
False
Sending 57 to the source block
False
Sending 58 to the source block
False
Sending 59 to the source block
False
Finished sending data to the source block
Received: 
10 10
...