Как корректно отключить TPL Dataflow при фатальном исключении? - PullRequest
2 голосов
/ 28 февраля 2020

Я использую последовательную конвейерную сборку на потоке данных TPL, которая состоит из 3 блоков:

  • B1 - готовит сообщение
  • B2 - отправляет сообщение на удаленный сервис
  • B3 - сохраняет результат

Проблема заключается в том, как отключить конвейер, когда возникает ошибка, такая как отключение службы. Трубопровод должен go отключаться контролируемым образом, поэтому результаты от B2 не будут потеряны.

Ответы [ 2 ]

1 голос
/ 28 февраля 2020

Когда блок потока данных завершает , это означает, что:

[...] он не должен принимать, генерировать больше сообщений или потреблять больше отложенных сообщений.

Если вы хотите, чтобы все сообщения, которые были успешно обработаны b2, были обработаны bFinal, то есть три вещи, которые вы не должны делать:

  1. Не определяйте блок bFinal с помощью BoundedCapacity.
  2. Не отменяйте блок bFinal.
  3. Не распространяйте завершение от b2 до bFinal.

Если вы ограничите bFinal, и случится так, что b2 быстрее обрабатывает сообщения, тогда, если b2 не удастся, обработанные сообщения будут храниться в его выходном буфере. Эти сообщения не будут предлагаться связанному блоку bFinal. Эти сообщения будут потеряны.

Если вы выполните любое из 2 или 3 (отмените bFinal или распространите завершение сбойного блока b2 на bFinal), тогда блок bFinal не будет обрабатывать сообщения, которые хранятся в его входном буфере. Он будет ждать завершения сообщений, которые в данный момент выполняются, а затем завершит себя в отмененном или сбойном состоянии, отбрасывая сообщения в своем входном буфере. Так что не делайте этого:

b2.LinkTo(bFinal, new DataflowLinkOptions { PropagateCompletion = true });

Сделайте это вместо:

b2.LinkTo(bFinal);
b2.PropagateCompletionAlwaysSuccessful(bFinal);

Вот метод расширения PropagateCompletionAlwaysSuccessful:

public static async void PropagateCompletionAlwaysSuccessful(this IDataflowBlock source,
    IDataflowBlock target)
{
    try
    {
        await source.Completion.ConfigureAwait(false);
    }
    catch
    {
        // Ignore exception
    }
    finally
    {
        target.Complete();
    }
}

Для обработки любого исключения, которые произошли в любом из блоков b2 или bFinal, сделайте это в конце:

await Task.WhenAll(b2.Completion, bFinal.Completion);
1 голос
/ 28 февраля 2020

Решение было простым, но мне потребовалось несколько раундов, прежде чем я его получил, поскольку за основной информацией о библиотеке c на сайте Microsoft не так много информации.

Надеюсь, она кому-нибудь поможет. Решение может быть легко перенастроено для удовлетворения других требований.

Представленный подход основан на:

  • CancellationTokenSource для сигнализации отключения. Каждый блок, в случае фатального исключения, должен сигнализировать об отключении через общий CancellationTokenSource объект.
  • Блоки, которые должны перестать работать сразу после инициализации сигнала, передавая общий CancellationTokenSource объект
  • Программа должна дождаться, пока последний блок завершит всю обработку сообщений.

Здесь решение в классе конвейера и тест, доказывающий его работоспособность.

Здесь работает пример:

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;
using System.Threading.Tasks;
using System.Diagnostics;

namespace Tests.Sets.Research
{
    [TestClass]
    public class TPLTest
    {
        public class PipeLine
        {
            CancellationTokenSource cancellationTokenSource;
            TransformBlock<int, int> b1, b2;
            ActionBlock<int> bFinal;

            static int SimulateWork(String blockName, int message, CancellationTokenSource cancellationTokenSource)
            {
                try
                {
                    Thread.Sleep(100);
                    Trace.WriteLine($"{blockName} processed: {message}");
                }
                catch (Exception ex)
                {
                    Trace.WriteLine($"Fatal error {ex.Message} at {blockName}");
                    cancellationTokenSource.Cancel();
                }
                return message;
            }


            public PipeLine(CancellationTokenSource cancellationTokenSource)
            {
                this.cancellationTokenSource = cancellationTokenSource;

                // Create three TransformBlock<int, int> objects. 
                // Each blocks <int, int> object calls the SimulateWork method.
                Func<string, int, CancellationTokenSource, int> doWork = (name, message, ct) => SimulateWork(name, message, ct);

                b1 = new TransformBlock<int, int>((m1) => doWork("b1", m1, cancellationTokenSource),
                   new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 , CancellationToken = cancellationTokenSource.Token}); //discard messages on  this block if cancel is signaled
                b2 = new TransformBlock<int, int>((m1) => doWork("b2", m1, cancellationTokenSource), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });
                bFinal = new ActionBlock<int>((m1) => doWork("bFinal", m1, cancellationTokenSource), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });

                b1.LinkTo(b2, new DataflowLinkOptions { PropagateCompletion = true });
                b2.LinkTo(bFinal, new DataflowLinkOptions { PropagateCompletion = true });
            }

            internal void Complete()
            {
                b1.Complete();
            }

            public void waifForCompletetion()
            {               
                Trace.WriteLine($"Waiting for pipeline to end gracefully");
                bFinal.Completion.Wait();
                Trace.WriteLine($"Pipeline terminated");               
            }

            public void submitToPipe(int message)
            {
                if (cancellationTokenSource.IsCancellationRequested)
                {
                    Trace.WriteLine($"Message {message} was rejected. Pipe is shutting down.Throttling meanwhile");
                    return;
                }
                b1.SendAsync(message);
            }
        }

        [TestMethod]
        public void TestShutdown()
        {
            var cancellationTokenSource = new CancellationTokenSource();
            var pipeLine = new PipeLine(cancellationTokenSource);

            //post failure in 2 seconds. 
            //It would be the same if was signal from inside block 2
            Task.Factory.StartNew(async () =>
            {
                await Task.Delay(2000);
                Console.WriteLine("Time to shutdown the pipeline!");
                cancellationTokenSource.Cancel();
            });

            //send requests to pipe in background for 5 seconds
            Task.Run(async () =>
            {
                for (int i = 1; i < 100; i++)
                {
                    if (cancellationTokenSource.IsCancellationRequested)
                        break;

                    Thread.Sleep(50); //to see pipe closing input
                    pipeLine.submitToPipe(i);
                }
                pipeLine.Complete();
            });

            pipeLine.waifForCompletetion();
        }
    }
}

Вот результат:

b2 processed: 13
b1 processed: 22
Message 45 was rejected. Pipe is shutting down.Throttling meanwhile 
b2 processed: 14
bFinal processed: 8
b2 processed: 15
bFinal processed: 9
bFinal processed: 10
bFinal processed: 11
bFinal processed: 12
bFinal processed: 13
bFinal processed: 14
bFinal processed: 15
Pipeline terminated

С того момента, как Сообщение 45 было отклонено, больше сообщений не было обработано на B1.

Все сообщения уже в Очередь B2 достигла конца конвейера.

...