Оператор Rx для запуска задач последовательно, собирая исключения в конце - PullRequest
1 голос
/ 28 декабря 2011

Я использую Rx в Silverlight 4 для вызова моего кода доступа к данным (при отсутствии места TPL).

Я хотел бы сделать 3 вызова веб-службы последовательно (пока не параллельно) иполучить свои результаты.Раньше я использовал SelectMany:

var seq = from a1 in Observable.Return(5).Delay(TimeSpan.FromSeconds(2))
          from b1 in Observable.Return(6).Delay(TimeSpan.FromSeconds(2))
          from c1 in Observable.Return(7).Delay(TimeSpan.FromSeconds(2))
          select new { a1, b1, c1 };

, но я хотел бы, чтобы 2-й и 3-й вызовы все еще выполнялись, даже если первый имеет исключение.

Есть ли оператор Rx, который будетобъединить последовательности, но только OnException после того, как все его последовательности завершены?Что-то, что будет функционально эквивалентно следующему коду:

using System;
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace ConsoleApplication4
{

    public class Results
    {
        public int A { get; set; }
        public int B { get; set; }
        public string C { get; set; }
    }

    class Program
    {
        static void Main(string[] args)
        {
            new Program().Test();
        }

        public void Test()
        {

            GetResults().SubscribeOn(Scheduler.NewThread).Subscribe(
                results => Console.WriteLine("{0} {1} {2}", results.A, results.B, results.C),
                ex => Console.WriteLine(ex.ToString()),
                () => Console.WriteLine("Completed")
            );

            Console.WriteLine("Not blocking");

            Console.Read();
        }

        public IObservable<Results> GetResults()
        {
            return Observable.Create<Results>(obs =>
                {

                    var a = Observable.Return(5).Delay(TimeSpan.FromSeconds(2));
                    var b = Observable.Throw<int>(new Exception("uh oh")).Delay(TimeSpan.FromSeconds(2));
                    var c = Observable.Return("7").Delay(TimeSpan.FromSeconds(2));

                    var results = new Results();
                    var exceptions = new List<Exception>();

                    try
                    {
                        results.A = a.FirstOrDefault();
                    }
                    catch (Exception ex)
                    {
                        exceptions.Add(ex);
                    }

                    try
                    {
                        results.B = b.FirstOrDefault();
                    }
                    catch (Exception ex)
                    {
                        exceptions.Add(ex);
                    }

                    try
                    {
                        results.C = c.FirstOrDefault();
                    }
                    catch (Exception ex)
                    {
                        exceptions.Add(ex);
                    }

                    obs.OnNext(results);
                    if (exceptions.Count > 0)
                        obs.OnError(new AggregateException(exceptions.ToArray()));
                    else
                        obs.OnCompleted();
                    return Disposable.Empty;
                });
        }
    }
}

1 Ответ

1 голос
/ 29 декабря 2011

Как насчет:

var result = Observable.Concat(
    startObservable1().Catch(Observable.Return<TTheType>(null)),
    startObservable2().Catch(Observable.Return<TTheType>(null)),
    startObservable3().Catch(Observable.Return<TTheType>(null)));

Существует ли оператор Rx, который будет объединять последовательности, но только OnException после завершения всех его последовательностей?

Эта часть немного сложнее, я использую этот класс, хотя я не очень доволен им:

public class Maybe<T>
{
    public Exception Exception { get; protected set; }

    T _Value;
    public T Value {
        get {
            if (Exception != null) {
                throw Exception;
            }
            return _Value;
        }
        protected set { _Value = value; }
    }

    public static Maybe<T> AsError(Exception ex)
    {
        return new Maybe<T>() {Value = default(T), Exception = ex};
    }

    public static Maybe<T> AsValue(T value)
    {
        return new Maybe<T>() {Value = value};
    }
}

Тогда вы можете сделать это:

var result = Observable.Concat(
    startObservable1().Select(x => Maybe.AsValue(x)).Catch<T1, Exception>(x => Maybe.AsError(x)),
    startObservable2().Select(x => Maybe.AsValue(x)).Catch<T1, Exception>(x => Maybe.AsError(x)),
    startObservable3().Select(x => Maybe.AsValue(x)).Catch<T1, Exception>(x => Maybe.AsError(x)));

Возможно, вы могли бы написать свой собственный метод расширения Maybeify (), который скрывает Select + Catch.

...