TPL Dataflow C# дождитесь завершения всех связанных блоков - PullRequest
1 голос
/ 05 мая 2020

Я использую поток данных TPL для построения конвейера. Этот конвейер должен логически выполнять следующие действия:

  1. Начать с обработки нескольких элементов данных - скажем, pollingBlock.
  2. В случае выполнения определенных условий проход один элементов (удовлетворяющих условиям) в определенный блок для дальнейшего мониторинга, допустим, это monitoringBlock. Каждый monitoringBlock может содержать только 1 элемент, но их несколько monitoringBlocks.
  3. pollingBlock должны продолжать обрабатывать все элементы, включая отправленный, как while (true).
  4. monitoringBlocks в то время как занято не должно принимать никаких других сообщений, и эти сообщения должны быть просто удалены без дальнейшей обработки.
  5. После некоторой обработки в monitoringBlock сообщение должно быть либо помечено как завершенное, либо передаваться в следующий блок для обработки следующий блок: processingBlock

Краткий пример:

public Task ExecutePipeline()
{
    var block = CreatePollingPipeline();
    block.Post((_serviceOne, _serviceTwo));

    block.Complete();
    return block.Completion;
}

public ActionBlock<(IServiceOne serviceOne, IServiceTwo serviceTwo)> CreatePollingPipeline()
{
    var pollingAlertHolder = new BufferBlock<(string input1, string input2)>();

    var pollingBlock = new ActionBlock<(IServiceOne serviceOne, IServiceTwo serviceTwo)>(services =>
    {
        while (true)
        {
            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP1", "INPVAL"));
            Thread.Sleep(2000);

            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP1", "INPVAL"));
            Thread.Sleep(2000);

            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP2", "INPVAL2"));
            Thread.Sleep(2000);

            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP1", "INPVAL"));
            Thread.Sleep(2000);

            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP1", "INPVAL"));
            Thread.Sleep(2000);

            Console.WriteLine("Posting to alert block");
            pollingAlertHolder.Post(("INP2", "INPVAL2"));
            Thread.Sleep(2000);
        }
    });

    var monitoringBlock = new TransformBlock<(string input1, string input2), (string input1, string input2)>(inputs =>
        {
            Console.WriteLine("monitoringBlock started");
            Thread.Sleep(5000);
            Console.WriteLine("monitoringBlock completed");

            return (inputs.input1, inputs.input2);
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });

    pollingAlertHolder.LinkTo(monitoringBlock, new DataflowLinkOptions() { PropagateCompletion = true },
        inputs => inputs.input1 == "INP1" && inputs.input2 == "INPVAL");
    pollingAlertHolder.LinkTo(DataflowBlock.NullTarget<(string input1, string input2)>());

    var processingBlock = new ActionBlock<(string input1, string input2)>(i =>
    {
        Console.WriteLine("processingBlock started");
        Thread.Sleep(2000);
        Console.WriteLine("processingBlock completed");
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
    monitoringBlock.LinkTo(processingBlock, new DataflowLinkOptions { PropagateCompletion = true });


    return pollingBlock;
}

Мой вопрос: как мне сохранить monitoringBlock занятым, пока связанный processingBlock не завершит свое работа? Я не хочу, чтобы какие-либо элементы отправлялись на monitoringBlock до того, как сообщение завершит цикл обработки FULL .

1 Ответ

1 голос
/ 06 мая 2020

Как уже упоминалось в комментариях, вы можете просто инкапсулировать logi c из monitoringBlock и processingBlock в один блок, например, вы можете добиться этого с помощью предопределенного метода Datablock.Encapsulate.

Однако, если вы не хотите этого делать, вы можете использовать AutoResetEvent или аналогичную абстракцию, и ваш код может быть таким:

AutoResetEvent dataflowEvent = new AutoResetEvent(true);
var bufferBlock = new ActionBLock<(string input1, string input2)>(i =>
{
    dataflowEvent.WaitOne();
    monitoringBlock.Post(i);
});
var monitoringBlock = new TransformBlock<(string input1, string input2), (string input1, string input2)>(inputs =>
    {
        Console.WriteLine("monitoringBlock started");
        Thread.Sleep(5000);
        Console.WriteLine("monitoringBlock completed");

        dataflowEvent.Set();
        return (inputs.input1, inputs.input2);
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 1 });
...