Я согласен с другими, что TPL Dataflow звучит как хорошее решение для этого.
Чтобы ограничить обработку, вы можете создать TransformBlock
, который фактически не преобразует данные, а просто задерживает их, если они поступили слишком рано после предыдущих данных:
static IPropagatorBlock<T, T> CreateDelayBlock<T>(TimeSpan delay)
{
DateTime lastItem = DateTime.MinValue;
return new TransformBlock<T, T>(
async x =>
{
var waitTime = lastItem + delay - DateTime.UtcNow;
if (waitTime > TimeSpan.Zero)
await Task.Delay(waitTime);
lastItem = DateTime.UtcNow;
return x;
},
new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
}
Затем создайте метод, который производит данные (например, целые числа, начинающиеся с 0):
static async Task Producer(ITargetBlock<int> target)
{
int i = 0;
while (await target.SendAsync(i))
i++;
}
Он написан асинхронно, поэтому, если целевой блок не может обработать элементы прямо сейчас, он будет ждать.
Затем напишите потребительский метод:
static void Consumer(int i)
{
Console.WriteLine(i);
}
И, наконец, свяжите все это вместе и запустите:
var delayBlock = CreateDelayBlock<int>(TimeSpan.FromMilliseconds(500));
var consumerBlock = new ActionBlock<int>(
(Action<int>)Consumer,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
delayBlock.LinkTo(consumerBlock, new DataflowLinkOptions { PropagateCompletion = true });
Task.WaitAll(Producer(delayBlock), consumerBlock.Completion);
Здесь delayBlock
будет принимать не более одного элемента каждые 500 мс, а метод Consumer()
может запускаться несколько раз параллельно. Чтобы закончить обработку, позвоните delayBlock.Complete()
.
Если вы хотите добавить кеширование для вашего # 2, вы можете создать еще одну TransformBlock
и выполнить там работу и связать ее с другими блоками.