TPL DataFlow - Пакетирование по продолжительности или порогу - PullRequest
0 голосов
/ 03 октября 2018

Я реализовал шаблон "производитель ... потребитель", использующий поток данных TPL.Вариант использования: код читает сообщения с шины Kafka.Для эффективности нам нужно обрабатывать сообщения в пакетном режиме при переходе в базу данных.

Есть ли способ в потоке данных TPL удерживать сообщение и запускать при достижении порога размера или продолжительности?

Пример: текущая реализация отправляет сообщение, как только оно извлекается из очереди.

    postedSuccessfully = targetBuffer.Post(msg.Value);

Ответы [ 3 ]

0 голосов
/ 06 ноября 2018

Я думаю, вы могли бы использовать что-то вроде этого, в основном это просто BatchBlock с Timeout, все свернутые в один

BatchBlockEx

public sealed class BatchBlockEx<T> : IDataflowBlock, IPropagatorBlock<T, T[]>, ISourceBlock<T[]>, ITargetBlock<T>, IReceivableSourceBlock<T[]>
{
   private readonly AsyncAutoResetEvent _asyncAutoResetEvent = new AsyncAutoResetEvent();

   private readonly BatchBlock<T> _base;

   private readonly CancellationToken _cancellationToken;

   private readonly int _triggerTimeMs;

   public BatchBlockEx(int batchSize, int triggerTimeMs)
   {
      _triggerTimeMs = triggerTimeMs;
      _base = new BatchBlock<T>(batchSize);
      PollReTrigger();
   }

   public BatchBlockEx(int batchSize, int triggerTimeMs, GroupingDataflowBlockOptions dataflowBlockOptions)
   {
      _triggerTimeMs = triggerTimeMs;
      _cancellationToken = dataflowBlockOptions.CancellationToken;
      _base = new BatchBlock<T>(batchSize, dataflowBlockOptions);
      PollReTrigger();
   }

   public int BatchSize => _base.BatchSize;

   public int OutputCount => _base.OutputCount;

   public Task Completion => _base.Completion;

   public void Complete() => _base.Complete();

   void IDataflowBlock.Fault(Exception exception) => ((IDataflowBlock)_base).Fault(exception);

   public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions) => _base.LinkTo(target, linkOptions);

   T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target, out bool messageConsumed) => ((ISourceBlock<T[]>)_base).ConsumeMessage(messageHeader, target, out messageConsumed);

   void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target) => ((ISourceBlock<T[]>)_base).ReleaseReservation(messageHeader, target);

   bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target) => ((ISourceBlock<T[]>)_base).ReserveMessage(messageHeader, target);

   DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
   {
      _asyncAutoResetEvent.Set();
      return ((ITargetBlock<T>)_base).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
   }

   public bool TryReceive(Predicate<T[]> filter, out T[] item) => _base.TryReceive(filter, out item);

   public bool TryReceiveAll(out IList<T[]> items) => _base.TryReceiveAll(out items);

   public override string ToString() => _base.ToString();

   public void TriggerBatch() => _base.TriggerBatch();

   private void PollReTrigger()
   {
      async Task Poll()
      {
         try
         {
            while (!_cancellationToken.IsCancellationRequested)
            {
               await _asyncAutoResetEvent.WaitAsync()
                                          .ConfigureAwait(false);

               await Task.Delay(_triggerTimeMs, _cancellationToken)
                           .ConfigureAwait(false); 
               TriggerBatch();
            }
         }
         catch (TaskCanceledException)
         {
            // nope
         }
      }

      Task.Run(Poll, _cancellationToken);
   }
}

AsyncAutoResetEvent

public class AsyncAutoResetEvent
{
   private static readonly Task _completed = Task.FromResult(true);
   private readonly Queue<TaskCompletionSource<bool>> _waits = new Queue<TaskCompletionSource<bool>>();
   private bool _signaled;

   public Task WaitAsync()
   {
      lock (_waits)
      {
         if (_signaled)
         {
            _signaled = false;
            return _completed;
         }

         var tcs = new TaskCompletionSource<bool>();
         _waits.Enqueue(tcs);
         return tcs.Task;
      }
   }

   public void Set()
   {
      TaskCompletionSource<bool> toRelease = null;

      lock (_waits)
         if (_waits.Count > 0)
            toRelease = _waits.Dequeue();
         else if (!_signaled)
            _signaled = true;

      toRelease?.SetResult(true);
   }
}
0 голосов
/ 06 ноября 2018

Буферизация по количеству и продолжительности уже доступна через System.Reactive и, в частности, оператор Buffer .Буфер собирает входящие события до тех пор, пока не будет достигнут желаемый счетчик или пока не истечет его временной интервал.

Блоки потока данных предназначены для работы с System.Reactive.Блоки можно преобразовать в Observables и Observers с помощью методов расширения DataflowBlock.AsObservable () и AsObserver () .

Это делает создание блока буферизации очень простым:

public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
    var inBlock = new BufferBlock<TIn>();
    var outBlock = new BufferBlock<IList<TIn>>();

    var outObserver=outBlock.AsObserver();
    inBlock.AsObservable()
            .Buffer(timeSpan, count)
            .ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(outObserver);

    return DataflowBlock.Encapsulate(inBlock, outBlock);

}

Этот метод использует два блока буфера для буферизации входов и выходов.Buffer() читает из входного блока (наблюдаемый) и записывает в выходной блок (наблюдатель), когда пакет заполнен или истекает временной интервал.

По умолчанию Rx работает в текущем потоке.Вызывая ObserveOn(TaskPoolScheduler.Default), мы говорим ему обрабатывать данные в потоке пула задач.

Пример

Этот код создает буферный блок для 5 элементов или 1 секунды.Он начинает с публикации 7 элементов, ждет 1,1 секунды, затем публикует еще 7 элементов.Каждая партия записывается на консоль вместе с идентификатором потока:

static async Task Main(string[] args)
{
    //Build the pipeline
    var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);

    var options = new DataflowLinkOptions { PropagateCompletion = true };
    var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
    bufferBlock.LinkTo(printBlock, options);

    //Start the messages
    Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");

    for (int i=0;i<7;i++)
    {
        bufferBlock.Post(i.ToString());
    }
    await Task.Delay(1100);
    for (int i=7; i < 14; i++)
    {
        bufferBlock.Post(i.ToString());
    }
    bufferBlock.Complete();
    Console.WriteLine($"Finishing");
    await bufferBlock.Completion;
    Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
    Console.ReadKey();
}

static void printOut(IEnumerable<string> items)
{
    var line = String.Join(",", items);
    Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}

Вывод:

Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6
0 голосов
/ 03 октября 2018

Несмотря на то, что тайм-аут из коробки отсутствует, вы можете подключить таймер к TriggerBatch всякий раз, когда нисходящий конвейер достаточно долго ждал пакета.Затем сбросьте таймер, когда партия будет пропущена.BatchBlock позаботится обо всем остальном за вас.

Теперь, например, этот образец был настроен так, чтобы каждый раз размер пакета составлял 1, даже если пакетный блок обычно ожидает 10 элементов.Тайм-аут заставляет опустошить то, что в данный момент хранится в BatchBlock

public class BatchBlockExample
{
    [Test]
    public async Task BatchBlockWithTimeOut()
    {
        var batchBlock = new BatchBlock<int>(10);

        var timeOut = TimeSpan.FromSeconds(1);
        var timeOutTimer = new System.Timers.Timer(timeOut.TotalMilliseconds);
        timeOutTimer.Elapsed += (s, e) => batchBlock.TriggerBatch();            

        var actionBlock = new ActionBlock<IEnumerable<int>>(x =>
        {
            //Reset the timeout since we got a batch
            timeOutTimer.Stop();
            timeOutTimer.Start();
            Console.WriteLine($"Batch Size: {x.Count()}");
        });

        batchBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        timeOutTimer.Start();

        foreach(var item in Enumerable.Range(0, 5))
        {
            await Task.Delay(2000);
            await batchBlock.SendAsync(item);
        }

        batchBlock.Complete();
        await actionBlock.Completion;
    }
}

Вывод:

Batch Size: 1
Batch Size: 1
Batch Size: 1
Batch Size: 1
Batch Size: 1
...