Продолжить обработку цикла с использованием конвейера потока данных - PullRequest
0 голосов
/ 01 мая 2019

Я играю с потоками данных и пытаюсь научиться их использовать. Я нашел множество примеров, показывающих, как использовать различные блоки, но ни один из них не объясняет, как обрабатывать исключения.

Мой главный вопрос - как продолжить цикл foreach, если возникает исключение или вывод предыдущего блока преобразования не соответствует ожидаемому? Ниже приведено простое приложение Windows Forms, которое я использую для тестирования. Это всего лишь одна кнопка, которая просматривает список чисел и отображает их.

Я добавил оператор if в блок действия, который говорит, что если число = 5, выведите исключение. Цикл выглядит так, как будто он продолжает обработку после попадания в исключение, но прекращает запись выходных данных после попадания в исключение. Исключение также никогда не переходит к предложению catch в цикле foreach.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks.Dataflow;
using System.Windows.Forms;

namespace DataFlowsTest
{
    public partial class Form1 : Form
    {
        public Form1()
        {
            InitializeComponent();
        }

        private void button1_Click(object sender, EventArgs e)
        {
            List<int> TestList = new List<int>() { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

            var actionBlock = new ActionBlock<int>(item =>
            {
                if (item == 5)
                    throw new Exception("Blech.");
                Debug.WriteLine(item.ToString());
            });


            foreach(var number in TestList)
            {
                try
                {
                    actionBlock.Post(number);
                }
                catch(AggregateException ex)
                {
                    Debug.WriteLine(ex.Message);
                    continue;
                }
            }
            actionBlock.Complete();
        }
    }
}

Этот код возвращается 1 2 3 4 Исключение: «System.Exception» в DataFlowsTest.exe Исключение типа «System.Exception» произошло в DataFlowsTest.exe, но не было обработано в коде пользователя Blech.

Ответы [ 3 ]

1 голос
/ 01 мая 2019

Вот как я это реализовал.Я могу поделиться гораздо больше о Github, если этот подход вас интересует.Я использую поток данных довольно часто, поэтому я реализовал множество других классов IDataFlow на основе этого подхода.

Строительный блок

По сути, оборачивая каждое сообщение в класс, называемый Flow<T>, мы можем реализовать подход Railway Oriented .Поток имеет два состояния: неудача или успех.Успешные потоки Flow<T> передаются в следующий пользовательский поток данных или подключаются к FailureBlock : ITargetBlock<IFlow> в случае сбоя.(по существу, ActionBlock<IFlow>, который имеет дело с исключениями, журналами и т. д.

Мой базовый класс Flow выглядит следующим образом:

public class Flow<T> : IFlow
{
  public T Value { get; private set; }
  public Exception Exception { get; private set; }
  public bool Success => Exception is null;
  public bool Failure => !Success;
  public void Fail(Exception exception) => Exception = exception;
  public Flow(T value) => Data = value;
  public Flow(Exception exception) => Fail(exception);
  public static Flow<T> FromValue<T>(T data) => new Flow<T>(data);
}
public interface IFlow
{
  bool Success { get; }
  bool Failure { get; }
  Exception Exception { get; }
  void Fail(Exception exception);
}

Результирующий пользовательский поток данных IData

Следующая часть выглядит страшно, но не надо. По сути, это оболочка TransformBlock с двумя дополнительными функциями:

введите код здесь 1. каждый пользовательский FlowBlock<T1,T2> упаковывает методы в try { } catch { }

метод LinkTo связывает успешные потоки со следующим блоком и сбои с FailureBlock
public class FlowBlock<TInput, TOutput>: IPropagatorBlock<Flow<TInput>, Flow<TOutput>>
{
    protected override ITargetBlock<Flow<TInput>> Target => TransformBlock;
    protected override ISourceBlock<Flow<TOutput>> Source => TransformBlock;
    private TransformBlock<Flow<TInput>, Flow<TOutput>> TransformBlock { get; }
    private FailureBlock FailureBlock { get; }
    public FlowBlock(
        Func<TInput, Task<TOutput>> transform,
        ExecutionDataflowBlockOptions dataflowBlockOptions,
        FailureBlock failureBlock)
    {
        TransformBlock = new TransformBlock<Flow<TInput>, Flow<TOutput>>(
            async inFlow =>
            {
                try
                {
                    return new Flow<TOutput>(await transform(inFlow.Data));
                }
                catch (Exception exception)
                {
                    return new Flow<TOutput>(exception);
                }
            },
            dataflowBlockOptions);
    }
    public override IDisposable LinkTo(
        ITargetBlock<Flow<TOutput>> target,
        DataflowLinkOptions linkOptions)
        => new Disposable(
            Source.LinkTo(target, linkOptions, flow => flow.Success),
            Source.LinkTo(OutputBlock, linkOptions, flow => flow.Failure));
}

Дайте мне знать в комментариях, если вы заинтересованы, и я с удовольствием откроюрепозиторий Github с гораздо большим количеством деталей.

0 голосов
/ 02 мая 2019

Вы бросаете исключение в своем блоке.Это приведет к тому, что блок перейдет в состояние ошибки и присоединит исключение к его задаче Completion.

Ваш код имеет try/catch только вокруг actionBlock.Post, это не то место, где выдается исключение.

Поскольку исключение прикреплено к задаче завершения, единственный способ перехватить исключение за пределами блока - это await actionBlock.Completion, который перезапустит исключение за пределами блока и позволит вам перехватить (Exception ex).

Для предотвращения отказа блока;поймать исключение в блоке.Если исключение покидает блок, блок будет поврежден и больше не будет принимать новые входные данные.

var actionBlock = new ActionBlock<int>(item =>
{
    try
    {
        if (item == 5)
            throw new Exception("Blech.");
        Debug.WriteLine(item.ToString());
    }
    catch (Exception ex)
    {
        Debug.WriteLine(ex);
    }

});

Кроме того, вы также можете обработать результат Post или лучше SendAsync и реагировать наошибка блока:

foreach (var number in TestList)
{
    if(!actionBlock.Post(number))
        actionBlock.Complete();
    try
    {
        await actionBlock.Completion;
    }
    catch (Exception ex)
    {
        Debug.WriteLine(ex.Message);
        //actionBlock is now dead
        break;
    }
}
0 голосов
/ 01 мая 2019

Вы бросаете Exception, но только ловите AggregateException

Добавить улов для универсального (Exception ex) или для типа, который вы хотите поймать

...