Почему мой процесс все еще жив после исключения, выданного в SelectMany, тогда как исключение в подобном операторе rx приводит к необработанному исключению? - PullRequest
1 голос
/ 16 октября 2019

Вот пример программы, которая делает две подписки на консольный ввод (источник, наблюдаемый здесь не имеет значения). В первой подписке используется Observable.SelectMany, а во второй - аналогичный оператор SelectMany, который внутренне использует пакет System.Threading.Tasks.Dataflow . Исключение выдается для определенных входных данных в каждом из них. Исключение корректно пересылается в Observer onError, который перебрасывает его в стандартной реализации подписки. Наблюдаемое поведение состоит в том, что в случае исключения в SelectMany процесс продолжает выполняться, а в случае исключения в SelectManyPreseveOrder процесс завершается с необработанным исключением. В чем причина разного поведения? Есть ли способ добиться «более дружественного» поведения в операторе SelectManyPreserveOrder? Это консольное приложение .net 4.6.1, использующее Rx.Linq 2.2.5 и System.Threading.Tasks.Dataflow 4.10.0:

class Program
{
    static async Task Main()
    {
        AppDomain.CurrentDomain.UnhandledException += (sender, args) => Console.WriteLine("App domain unhandled exception");
        TaskScheduler.UnobservedTaskException += (sender, args) => Console.WriteLine("Unobserved task exception");

        var consoleInput = Helper.ConsoleInput();

        consoleInput.SelectMany(async input =>
            {
                await Task.Delay(50).ConfigureAwait(false);
                if (input == "1")
                    throw new Exception("This exception is swallowed");
                return input;
            })
            .Subscribe(s => Console.WriteLine($"SelectMany: {s}"));

        consoleInput.SelectManyPreserveOrder(async input =>
            {
                await Task.Delay(50).ConfigureAwait(false);
                if (input == "2")
                    throw new Exception("This exception kills the process");
                return input;
            })
            .Subscribe(s => Console.WriteLine($"SelectMany (TPL Dataflow): {s}"));

        await Task.Delay(TimeSpan.FromMinutes(10)).ConfigureAwait(false);
    }
}

public static class ObservableExtension
{
    public static IObservable<TResult> SelectManyPreserveOrder<TSource, TResult>(this IObservable<TSource> source, Func<TSource, Task<TResult>> selector, int maxParallelBatches = 1)
    {
        return source.FromTplDataflow(() =>
            new TransformBlock<TSource, TResult>(selector,
                new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxParallelBatches }));
    }

    public static IObservable<TResult> FromTplDataflow<T, TResult>(
        this IObservable<T> source, Func<IPropagatorBlock<T, TResult>> blockFactory)
    {
        return Observable.Defer(() =>
        {
            var block = blockFactory();
            return Observable.Using(() =>
            {
                var disposable = source.Subscribe(block.AsObserver());
                return Disposable.Create(dispose: () => disposable.Dispose());
            }, r => block.AsObservable());
        });
    }
}

public static class Helper
{
    public static IObservable<string> ConsoleInput()
    {
        return
            Observable
                .FromAsync(() => Console.In.ReadLineAsync())
                .Repeat()
                .Publish()
                .RefCount()
                .SubscribeOn(Scheduler.Default);
    }
}

Интересно, что обработчик UnobservedTaskException никогда не вызывается.

1 Ответ

2 голосов
/ 17 октября 2019

Исключение выдается здесь, но оно выбрасывается в ненаблюдаемое продолжение задачи . В .NET 4.5 и выше ненаблюдаемые исключения задач будут обрабатываться средой выполнения автоматически. Вот хорошая статья Стивена Тауба, в которой говорится об этом изменении .

Важный бит:

Чтобы разработчикам было проще писать асинхронный код на основе задач.NET 4.5 изменяет поведение исключений по умолчанию для ненаблюдаемых исключений. Хотя ненаблюдаемые исключения по-прежнему будут вызывать событие UnobservedTaskException (в противном случае это будет критическое изменение), процесс по умолчанию не завершится сбоем. Скорее, исключение в итоге будет съедено после возникновения события, независимо от того, соблюдает ли обработчик события исключение.

...