Буферизация по количеству и продолжительности уже доступна через System.Reactive и, в частности, оператор Buffer .Буфер собирает входящие события до тех пор, пока не будет достигнут желаемый счетчик или пока не истечет его временной интервал.
Блоки потока данных предназначены для работы с System.Reactive.Блоки можно преобразовать в Observables и Observers с помощью методов расширения DataflowBlock.AsObservable () и AsObserver () .
Это делает создание блока буферизации очень простым:
public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
var inBlock = new BufferBlock<TIn>();
var outBlock = new BufferBlock<IList<TIn>>();
var outObserver=outBlock.AsObserver();
inBlock.AsObservable()
.Buffer(timeSpan, count)
.ObserveOn(TaskPoolScheduler.Default)
.Subscribe(outObserver);
return DataflowBlock.Encapsulate(inBlock, outBlock);
}
Этот метод использует два блока буфера для буферизации входов и выходов.Buffer()
читает из входного блока (наблюдаемый) и записывает в выходной блок (наблюдатель), когда пакет заполнен или истекает временной интервал.
По умолчанию Rx работает в текущем потоке.Вызывая ObserveOn(TaskPoolScheduler.Default)
, мы говорим ему обрабатывать данные в потоке пула задач.
Пример
Этот код создает буферный блок для 5 элементов или 1 секунды.Он начинает с публикации 7 элементов, ждет 1,1 секунды, затем публикует еще 7 элементов.Каждая партия записывается на консоль вместе с идентификатором потока:
static async Task Main(string[] args)
{
//Build the pipeline
var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);
var options = new DataflowLinkOptions { PropagateCompletion = true };
var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
bufferBlock.LinkTo(printBlock, options);
//Start the messages
Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");
for (int i=0;i<7;i++)
{
bufferBlock.Post(i.ToString());
}
await Task.Delay(1100);
for (int i=7; i < 14; i++)
{
bufferBlock.Post(i.ToString());
}
bufferBlock.Complete();
Console.WriteLine($"Finishing");
await bufferBlock.Completion;
Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
Console.ReadKey();
}
static void printOut(IEnumerable<string> items)
{
var line = String.Join(",", items);
Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}
Вывод:
Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6