Я использую поток данных TPL для построения конвейера. Этот конвейер должен логически выполнять следующие действия:
- Начать с обработки нескольких элементов данных - скажем,
pollingBlock
. - В случае выполнения определенных условий проход один элементов (удовлетворяющих условиям) в определенный блок для дальнейшего мониторинга, допустим, это
monitoringBlock
. Каждый monitoringBlock
может содержать только 1 элемент, но их несколько monitoringBlocks
. pollingBlock
должны продолжать обрабатывать все элементы, включая отправленный, как while (true)
. monitoringBlocks
в то время как занято не должно принимать никаких других сообщений, и эти сообщения должны быть просто удалены без дальнейшей обработки. - После некоторой обработки в
monitoringBlock
сообщение должно быть либо помечено как завершенное, либо передаваться в следующий блок для обработки следующий блок: processingBlock
Краткий пример:
public Task ExecutePipeline()
{
var block = CreatePollingPipeline();
block.Post((_serviceOne, _serviceTwo));
block.Complete();
return block.Completion;
}
public ActionBlock<(IServiceOne serviceOne, IServiceTwo serviceTwo)> CreatePollingPipeline()
{
var pollingAlertHolder = new BufferBlock<(string input1, string input2)>();
var pollingBlock = new ActionBlock<(IServiceOne serviceOne, IServiceTwo serviceTwo)>(services =>
{
while (true)
{
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP2", "INPVAL2"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP1", "INPVAL"));
Thread.Sleep(2000);
Console.WriteLine("Posting to alert block");
pollingAlertHolder.Post(("INP2", "INPVAL2"));
Thread.Sleep(2000);
}
});
var monitoringBlock = new TransformBlock<(string input1, string input2), (string input1, string input2)>(inputs =>
{
Console.WriteLine("monitoringBlock started");
Thread.Sleep(5000);
Console.WriteLine("monitoringBlock completed");
return (inputs.input1, inputs.input2);
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
pollingAlertHolder.LinkTo(monitoringBlock, new DataflowLinkOptions() { PropagateCompletion = true },
inputs => inputs.input1 == "INP1" && inputs.input2 == "INPVAL");
pollingAlertHolder.LinkTo(DataflowBlock.NullTarget<(string input1, string input2)>());
var processingBlock = new ActionBlock<(string input1, string input2)>(i =>
{
Console.WriteLine("processingBlock started");
Thread.Sleep(2000);
Console.WriteLine("processingBlock completed");
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
monitoringBlock.LinkTo(processingBlock, new DataflowLinkOptions { PropagateCompletion = true });
return pollingBlock;
}
Мой вопрос: как мне сохранить monitoringBlock
занятым, пока связанный processingBlock
не завершит свое работа? Я не хочу, чтобы какие-либо элементы отправлялись на monitoringBlock
до того, как сообщение завершит цикл обработки FULL .