Если вы заранее знаете свои источники, я бы использовал JoinBlock
вместе с TransformBlock
. Вам нужно будет создать BufferBlock
для каждого источника.
Сначала JoinBlock
ожидает по одному сообщению от каждого источника и упаковывает их в один кортеж. Затем TransformBlock
создает массив результатов из промежуточного кортежа.
Если вы заранее не знаете своих источников, вам нужно объяснить, как вы ожидаете, что ваш новый блок узнает, когда будет получен результат. Затем эту логику следует поместить в пользовательский блок, вероятно, в виде TransformManyBlock<string,string[]>
.
Если вы хотите объединить динамическое число источников, вы можете создать блок неограниченного объединения, например:
private static void Main()
{
var source1 = new BufferBlock<string>();
var source2 = new BufferBlock<string>();
var source3 = new BufferBlock<string>();
var aggregator = CreateAggregatorBlock( 3 );
var result = new ActionBlock<string[]>( x => Console.WriteLine( string.Join( ", ", x ) ) );
source1.LinkTo( aggregator );
source2.LinkTo( aggregator );
source3.LinkTo( aggregator );
aggregator.LinkTo( result );
source1.Post( "message 1" );
source2.Post( "message 2" );
source3.Post( "message 3" );
Console.ReadLine();
}
private static TransformManyBlock<string, string[]> CreateAggregatorBlock( int sources )
{
var buffer = new List<string>();
return new TransformManyBlock<string, string[]>( message => {
buffer.Add( message );
if( buffer.Count == sources )
{
var result = buffer.ToArray();
buffer.Clear();
return new[] {result};
}
return Enumerable.Empty<string[]>();
} );
}
Предполагается, что ваши источники генерируют сообщения с одинаковой скоростью. Если это не так, вам нужно указать идентификатор источника рядом с сообщением и иметь буфер для каждого источника.