Распределение исключений между потоками (Reactive Extensions) - PullRequest
2 голосов
/ 28 февраля 2011

У меня есть последовательность IEnumerable, которая содержит некоторые блокирующие сетевые операции (заменены на некоторые простые выходы в приведенном ниже примере кода). Я использую Reactive Extensions для преобразования потока данных, поступающих через сеть, в наблюдаемую последовательность.

Я ищу способ перенести исключения в основной поток, чтобы необработанные исключения не приводили к прекращению работы моего приложения. Я не могу разместить блоки try / catch в потоке IEnumerable, поскольку компилятор не разрешает операторы yield return внутри операторов try / catch.

using System;
using System.Collections.Generic;
using System.Concurrency;
using System.Linq;
using System.Text;
using System.Threading;

namespace ConsoleApplication7
{
    class Program
    {
        static void Main(string[] args)
        {
            try
            {
                Console.WriteLine("Main thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
                var observable = TestEnumerable().ToObservable(Scheduler.NewThread); //Needs to be on a new thread because it contains long-running blocking operations

                // Use subject because we need many subscriptions to a single data source
                var subject = new Subject<int>();

                subject.Subscribe(x => Console.WriteLine("Subscriber1: " + x + " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId),
                    x => Console.WriteLine("Subscriber1 ERROR: " + x+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId),
                    () => Console.WriteLine("Subscriber1 Finished"+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId));
                subject.Subscribe(x => Console.WriteLine("Subscriber2: " + x + " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId),
                    x => Console.WriteLine("Subscriber2 ERROR: " + x+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId),
                    () => Console.WriteLine("Subscriber2 Finished"+ " on thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId));

                Console.WriteLine("Press key to start receiving data");
                Console.ReadKey();
                var sub = observable.Subscribe(subject);

                Console.WriteLine("Press key to exit");
                Console.ReadKey();
                sub.Dispose();
            }
            catch (Exception ex)
            {
                Console.WriteLine("Caught exception on main thread");
            }

        }

        public static IEnumerable<int> TestEnumerable()
        {
            while (true)
            {
                yield return 1;
                Thread.Sleep(200);
                yield return 2;
                Thread.Sleep(200);
                yield return 3;
                Thread.Sleep(200);
                throw new InvalidOperationException();
            }
        }
    }
}

1 Ответ

3 голосов
/ 28 февраля 2011

Решение зависит от того, доступен ли вам Dispatcher / SynchronisationContext. Конечно, предпочтительно использовать один в этом сценарии.

Решение 1: Диспетчер / СинхронизацияКонтент доступен

(т.е. с использованием WPF, Windows Forms или пользовательского цикла Dispatcher)

Вы можете использовать ObserveOn + Catch, чтобы переместить ошибку обратно в поток Dispatcher. Я видел, как это используется в приложении WPF, и оно работало хорошо.

Как вы перемещаете ваш IScheduler / DispatcherScheduler, зависит от вас (мы использовали IoC)

public static IObservable<T> CatchOn<T>(this IObservable<T> source, 
    IScheduler scheduler)
{
    return source.Catch<T,Exception>(ex => 
        Observable.Throw<T>(ex).ObserveOn(scheduler));
}

// We didn't use it, but this overload could useful if the dispatcher is 
// known at the time of execution, since it's an optimised path
public static IObservable<T> CatchOn<T>(this IObservable<T> source, 
    DispatcherScheduler scheduler)
{
    return source.Catch<T,Exception>(ex => 
        Observable.Throw<T>(ex).ObserveOn(scheduler));
}

Решение 2: Диспетчер недоступен

Вместо использования Console.ReadKey() используйте ManualResetEvent и дождитесь его, а затем сгенерируйте изменчивую ошибку:

        static void Main(string[] args)
        {
            try
            {
                Console.WriteLine("Main thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
                var observable = TestEnumerable().ToObservable(Scheduler.NewThread); //Needs to be on a new thread because it contains long-running blocking operations

                // Use subject because we need many subscriptions to a single data source
                var subject = new Subject<int>();

                Exception exception = null;
                ManualResetEvent mre = new ManualResetEvent(false);

                using(subject.Subscribe(
                    x => Console.WriteLine(x),
                    ex => { exception = ex; mre.Set(); },
                    () => Console.WriteLine("Subscriber2 Finished")))

                using(subject.Subscribe(
                    x => Console.WriteLine(x),
                    ex => { exception = ex; mre.Set(); },
                    () => Console.WriteLine("Subscriber2 Finished")))

                using (observable.Subscribe(subject))
                {
                    mre.WaitOne();
                }

                if (exception != null)
                    throw exception;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Caught exception on main thread");
            }

        }
...