Обрабатывать исключения с помощью блоков потока данных TPL - PullRequest
0 голосов
/ 09 июля 2019

У меня есть простой поток данных TPL, который в основном выполняет некоторые задачи. Я заметил, что когда есть исключение в любом из блоков данных, оно не попадало в исходный вызывающий родительский блок. Я добавил некоторый ручной код для проверки исключений, но это не похоже на правильный подход.

if (readBlock.Completion.Exception != null || saveBlockJoinedProcess.Completion.Exception != null || processBlock1.Completion.Exception != null || processBlock2.Completion.Exception != null)
        {
            throw readBlock.Completion.Exception;
        }

Я заглянул в интернет, чтобы увидеть предложенный подход, но не увидел ничего очевидного. Итак, я создал пример кода ниже и надеялся получить руководство по лучшему решению:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace TPLDataflow
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                //ProcessB();
                ProcessA();
            }
            catch(Exception e)
            {
                Console.WriteLine("Exception in Process!");
                throw new Exception($"exception:{e}");
            }
            Console.WriteLine("Processing complete!");
            Console.ReadLine();
        }

        private static void ProcessB()
        {
            Task.WhenAll(Task.Run(() => DoSomething(1, "ProcessB"))).Wait();
        }

        private static void ProcessA()
        {
            var random = new Random();
            var readBlock = new TransformBlock<int, int>(
                    x => { try { return DoSomething(x, "readBlock"); } catch (Exception e) { throw e; } },
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }); //1

            var braodcastBlock = new BroadcastBlock<int>(i => i); // ⬅ Here

            var processBlock1 =
                new TransformBlock<int, int>(x => DoSomethingAsync(5, "processBlock1")); //2
            var processBlock2 =
                new TransformBlock<int, int>(x => DoSomethingAsync(2, "processBlock2")); //3

            //var saveBlock =
            //    new ActionBlock<int>(
            //    x => Save(x)); //4

            var saveBlockJoinedProcess =
                new ActionBlock<Tuple<int, int>>(
                x => SaveJoined(x.Item1, x.Item2)); //4

            var saveBlockJoin = new JoinBlock<int, int>();

            readBlock.LinkTo(braodcastBlock, new DataflowLinkOptions { PropagateCompletion = true });

            braodcastBlock.LinkTo(processBlock1,
                new DataflowLinkOptions { PropagateCompletion = true }); //5

            braodcastBlock.LinkTo(processBlock2,
                new DataflowLinkOptions { PropagateCompletion = true }); //6


            processBlock1.LinkTo(
                saveBlockJoin.Target1); //7

            processBlock2.LinkTo(
                saveBlockJoin.Target2); //8

            saveBlockJoin.LinkTo(saveBlockJoinedProcess, new DataflowLinkOptions { PropagateCompletion = true });

            readBlock.Post(1); //10
                                //readBlock.Post(2); //10

            Task.WhenAll(
                        processBlock1.Completion,
                        processBlock2.Completion)
                        .ContinueWith(_ => saveBlockJoin.Complete());

            readBlock.Complete(); //12
            saveBlockJoinedProcess.Completion.Wait(); //13
            if (readBlock.Completion.Exception != null || saveBlockJoinedProcess.Completion.Exception != null || processBlock1.Completion.Exception != null || processBlock2.Completion.Exception != null)
            {
                throw readBlock.Completion.Exception;
            }
        }
        private static int DoSomething(int i, string method)
        {
            Console.WriteLine($"Do Something, callng method : { method}");
            throw new Exception("Fake Exception!");
            return i;
        }
        private static async Task<int> DoSomethingAsync(int i, string method)
        {
            Console.WriteLine($"Do SomethingAsync");
            throw new Exception("Fake Exception!");
            await Task.Delay(new TimeSpan(0,0,i));
            Console.WriteLine($"Do Something : {i}, callng method : { method}");
            return i;
        }
        private static void Save(int x)
        {

            Console.WriteLine("Save!");
        }
        private static void SaveJoined(int x, int y)
        {
            Thread.Sleep(new TimeSpan(0, 0, 10));
            Console.WriteLine("Save Joined!");
        }
    }
}

Ответы [ 2 ]

1 голос
/ 10 июля 2019

Я посмотрел в Интернете, чтобы увидеть, что такое предлагаемый подход, но не увидел ничего очевидного.

Если у вас есть конвейер (более или менее), то общий подход заключается в использовании PropagateCompletion для выключения канала. Если у вас более сложные топологии, вам нужно будет завершать блоки вручную.

В вашем случае у вас есть попытка распространения здесь:

Task.WhenAll(
    processBlock1.Completion,
    processBlock2.Completion)
    .ContinueWith(_ => saveBlockJoin.Complete());

Но этот код не будет распространять исключения. Когда оба processBlock1.Completion и processBlock2.Completion завершены, saveBlockJoin завершено успешно .

Лучшим решением было бы использовать await вместо ContinueWith:

async Task PropagateToSaveBlockJoin()
{
    try
    {
        await Task.WhenAll(processBlock1.Completion, processBlock2.Completion);
        saveBlockJoin.Complete();
    }
    catch (Exception ex)
    {
        ((IDataflowBlock)saveBlockJoin).Fault(ex);
    }
}
_ = PropagateToSaveBlockJoin();

Использование await побуждает вас обрабатывать исключения, что можно сделать, передав их в Fault для распространения исключения.

0 голосов
/ 09 июля 2019

на первый взгляд, если есть только некоторые незначительные моменты (не смотря на вашу архитектуру). мне кажется, что вы смешали несколько более новых и более старых конструкций. и есть некоторые части кода, которые не нужны.

например:

private static void ProcessB()
{
    Task.WhenAll(Task.Run(() => DoSomething(1, "ProcessB"))).Wait();
}

с использованием метода Wait (), если какие-либо исключения случаются, они будут помещены в исключение System.AggregateException. на мой взгляд, так лучше:

private static async Task ProcessBAsync()
{
    await Task.Run(() => DoSomething(1, "ProcessB"));
}

с использованием async-await, если возникает исключение, оператор await сбрасывает первое исключение, заключенное в исключение System.AggregateException. Это позволяет вам пытаться перехватить конкретные типы исключений и обрабатывать только те случаи, которые вы действительно можете обработать.

другая часть этой части кода:

private static void ProcessA()
        {
            var random = new Random();
            var readBlock = new TransformBlock<int, int>(
                    x => 
                    { 
                    try { return DoSomething(x, "readBlock"); } 
                    catch (Exception e) 
                    { 
                    throw e; 
                    } 
                    },
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }); //1

Зачем ловить исключение только для его отбрасывания? в этом случае try-catch является избыточным.

А вот это:

private static void SaveJoined(int x, int y)
{
    Thread.Sleep(new TimeSpan(0, 0, 10));
    Console.WriteLine("Save Joined!");
}

Гораздо лучше использовать await Task.Delay(....). Используя Task.Delay(...), ваше приложение не будет зависать.

...