TPL Dataflow создает объединенный массив результатов из всех входящих узлов (несколько производителей, 1 потребитель) - PullRequest
0 голосов
/ 29 августа 2018

Обратите внимание на следующий пример кода. Мне нужен узел агрегатора, который может быть связан с любым количеством источников, ждет, пока все источники отправят одно сообщение, а затем объединит их в результате [].

Это должно быть очевидно и прямо вперед, но почему-то я не нахожу решения. Я проверил JoinBlock и TransformaterBlock, но оба кажутся неподходящими.

using System;
using System.Threading.Tasks.Dataflow;

namespace ConsoleApp2
{
    internal class Program
    {
        private static readonly uint _produceCount = 0;
        private static void Main(string[] args)
        {

            BufferBlock<string> p1 = new BufferBlock<string>();
            BufferBlock<string> p2 = new BufferBlock<string>();

            // a block is required that accepts n sources as input, waits for all inputs to arrive, and then creates a result array from all inputs

            ActionBlock<string[]> c1 = new ActionBlock<string[]>((inputs) =>
            {
                Console.WriteLine(String.Join(',', inputs));
            });

            p1.Post("Produce 1.1");
            p2.Post("Produce 2.1");

            // desired output:
            // "Produce 1.1, Produce 2.1"
            // actually the order is of no importance at this time

        }


    }
}

[Изменить] Дальнейшее уточнение: Я хотел бы иметь блок, который: - динамически ожидать все исходные заметки (в момент времени, когда поступает первое сообщение), чтобы завершить и агрегировать результат для передачи на узлы-последователи

Ответы [ 2 ]

0 голосов
/ 29 августа 2018

Вы можете использовать не жадный BatchBlock для этого. Будучи не жадным, каждый источник внесет один элемент в пакет. Это было , первоначально предложенное здесь . И вот проверенный пример: Обратите внимание, что в качестве доказательства source1 отправляется несколько элементов, которые не отображаются в пакете:

public class DataAggregator
{
    private BatchBlock<string> batchBlock = new BatchBlock<string>(5, new GroupingDataflowBlockOptions() { Greedy = false });
    private ActionBlock<string[]> writer = new ActionBlock<string[]>(strings => strings.ToList().ForEach(str => Console.WriteLine(str)));
    private BufferBlock<string> source1 = new BufferBlock<string>();
    private BufferBlock<string> source2 = new BufferBlock<string>();
    private BufferBlock<string> source3 = new BufferBlock<string>();
    private BufferBlock<string> source4 = new BufferBlock<string>();
    private BufferBlock<string> source5 = new BufferBlock<string>();

    public DataAggregator()
    {
        source1.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source2.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source3.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source4.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        source5.LinkTo(batchBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        batchBlock.LinkTo(writer, new DataflowLinkOptions() { PropagateCompletion = true });
    }

    [Test]
    public async Task TestPipeline()
    {
        source1.Post("string1-1");
        source1.Post("string1-2");
        source1.Post("string1-3");
        source2.Post("string2-1");
        source3.Post("string3-1");
        source4.Post("string4-1");
        source5.Post("string5-1");
        //Should print string1-1 string2-1 string3-1 string4-1 string5-1
        source1.Complete();
        source2.Complete();
        source3.Complete();
        source4.Complete();
        source5.Complete();
        await writer.Completion;
    }
}

Выход:

string1-1
string2-1
string3-1
string4-1
string5-1
0 голосов
/ 29 августа 2018

Если вы заранее знаете свои источники, я бы использовал JoinBlock вместе с TransformBlock. Вам нужно будет создать BufferBlock для каждого источника.

Сначала JoinBlock ожидает по одному сообщению от каждого источника и упаковывает их в один кортеж. Затем TransformBlock создает массив результатов из промежуточного кортежа.

Если вы заранее не знаете своих источников, вам нужно объяснить, как вы ожидаете, что ваш новый блок узнает, когда будет получен результат. Затем эту логику следует поместить в пользовательский блок, вероятно, в виде TransformManyBlock<string,string[]>.

Если вы хотите объединить динамическое число источников, вы можете создать блок неограниченного объединения, например:

private static void Main()
{
    var source1 = new BufferBlock<string>();
    var source2 = new BufferBlock<string>();
    var source3 = new BufferBlock<string>();
    var aggregator = CreateAggregatorBlock( 3 );
    var result = new ActionBlock<string[]>( x => Console.WriteLine( string.Join( ", ", x ) ) );
    source1.LinkTo( aggregator );
    source2.LinkTo( aggregator );
    source3.LinkTo( aggregator );
    aggregator.LinkTo( result );

    source1.Post( "message 1" );
    source2.Post( "message 2" );
    source3.Post( "message 3" );

    Console.ReadLine();
}

private static TransformManyBlock<string, string[]> CreateAggregatorBlock( int sources )
{
    var buffer = new List<string>();
    return new TransformManyBlock<string, string[]>( message => {
        buffer.Add( message );
        if( buffer.Count == sources )
        {
            var result = buffer.ToArray();
            buffer.Clear();
            return new[] {result};
        }
        return Enumerable.Empty<string[]>();
    } );
}

Предполагается, что ваши источники генерируют сообщения с одинаковой скоростью. Если это не так, вам нужно указать идентификатор источника рядом с сообщением и иметь буфер для каждого источника.

...