Порядок вывода выполнения потока данных TPL - PullRequest
0 голосов
/ 10 октября 2018

В приведенном ниже тестовом коде я ожидаю этого результата:

1, 2000
2, 4000
3, 6000

Однако фактический результат:

3, 6000    
2, 4000
1, 2000

Более того, я вижу результат на экране только после 6секунд.Что означает, что все вводимые данные ожидают и обрабатываются на следующем этапе.

Как я могу сделать так, чтобы конвейер выдавал результат на вход сразу после его завершения?

    public static void Run()
    {
        Func<int, string> fn = n =>
        {
            var sleep = n * 2000;
            Thread.Sleep(sleep);
            return n + ", " + sleep;
        };

        var opts = new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 4
        };

        var transformBlock = new TransformBlock<int, string>(fn, opts);
        var bufferBlock = new BufferBlock<string>(opts);

        transformBlock.LinkTo(bufferBlock, new DataflowLinkOptions { PropagateCompletion = true });

        for (var i = 3; i > 0; i--)
            transformBlock.Post(i);

        Console.WriteLine(bufferBlock.Receive());
        Console.WriteLine(bufferBlock.Receive());
        Console.WriteLine(bufferBlock.Receive());
    }

1 Ответ

0 голосов
/ 10 октября 2018

По умолчанию поток данных сохраняет порядок сообщений, даже если они обрабатываются параллельно.

Чтобы преобразовать сообщения как можно быстрее, т. Е. Не в порядке, установите EnsureOrdered в false впараметры вашего TransformBlock.

Обязательно используйте актуальную версию потока данных (в настоящее время пакет nuget System.Threading.Tasks.Dataflow существует в версии 4.9), так как EnsureOrdered не всегда был рядом.

Пример:

class Program
{
    static void Main( string[] args )
    {
        var transformBlock = new TransformBlock<int, int>( x =>
        {
            Thread.Sleep( x );
            return x;
        }, new ExecutionDataflowBlockOptions {EnsureOrdered = false, MaxDegreeOfParallelism = 10} );
        var actionBlock = new ActionBlock<int>( x => Console.WriteLine( x ) );
        transformBlock.LinkTo( actionBlock, new DataflowLinkOptions {PropagateCompletion = true});
        for( var i = 9; i > 0; i-- )
            transformBlock.Post( i * 1000 );
        transformBlock.Complete();
        actionBlock.Completion.Wait();
        Console.ReadLine();
    }
}

Это выводит

1000
2000
3000
4000
5000
6000
7000
8000
9000
...