Silverlight Task.WaitAll с использованием Rx - PullRequest
1 голос
/ 13 июля 2011

Я хотел бы вызвать два веб-сервиса одновременно и обработать ответы, когда оба будут сделаны.Я вызываю веб-сервисы, используя метод Obxable.FromAsyncPattern от Rx.Какой правильный метод для одновременной подписки на несколько IObservables?

Я пытался использовать Zip, но он не запускается одновременно, только начинается второй после получения первого результата.
EDIT: Вот демонстрация Zipили некоторые другие предложенные решения не решают мою проблему -

class Program
{
    static void Main(string[] args)
    {

        var observable1 = Observable.Create<int>(i =>
            {
                Console.WriteLine("starting 1");
                System.Threading.Thread.Sleep(2000);
                Console.WriteLine("done sleeping 1");
                i.OnNext(1);
                i.OnCompleted();
                return () => { };
            });
        var observable2 = Observable.Create<int>(i =>
        {
            Console.WriteLine("starting 2");
            System.Threading.Thread.Sleep(4000);
            Console.WriteLine("done sleeping 2");
            i.OnNext(1);
            i.OnCompleted();
            return () => { };
        });

        var m = observable1.Zip(observable2, (a, b) => new { a, b });

        var n = Observable.Merge(Scheduler.ThreadPool,
            observable1, observable2);

        var o = Observable.When(observable1.And(observable2).Then((a, b) => new { a, b }));


        m.Subscribe(
            (i) => Console.WriteLine(i),
            () => Console.WriteLine("finished"));

        Console.Read();
    }
}

Результаты:

starting 1
done sleeping 1
starting 2
done sleeping 2
{ a = 1, b = 1 }
finished

Желаемые результаты:

starting 1
starting 2
done sleeping 1
done sleeping 2
{ a = 1, b = 1 }
finished

Ответы [ 5 ]

2 голосов
/ 13 июля 2011

Использование метода расширения Zip является простым ответом здесь.

Если у вас есть пара типичных асинхронных вызовов (при условии, что используется один параметр):

Func<X1, IObservable<X2>> callX = Observable.FromAsyncPattern<X1, X2>(...);
Func<Y1, IObservable<Y2>> callY = Observable.FromAsyncPattern<Y1, Y2>(...);

Тогда вы можетевызовите оба и обработайте возвращаемые значения после того, как оба будут завершены, как показано ниже:

1 голос
/ 13 июля 2011

Соединения Rx обеспечивают решение.

Observable.And

Соответствует, когда обе наблюдаемые последовательности имеют доступное значение.

Демонстрация:

var xsL = Observable.Return(1);
var xsR = Observable.Return(2);
Observable<int> both =  Observable.When(xsL.And(xsR).Then((a,b) => a + b));
0 голосов
/ 10 февраля 2012

Я наткнулся на этот вопрос, когда искал что-то еще.Кажется, есть некоторая путаница относительно того, почему подписка на observable2 не осуществляется одновременно.Ответ прост: Rx не является многопоточным по умолчанию.Это зависит от вас, чтобы управлять потоками / планированием, и это поможет с параллелизмом.@ Энигмативность ускользает от этого, но я подумал, что это требует более глубокого объяснения.

Чтобы быть конкретным;у нас есть следующий (обобщенный) код

var observable1 = Observable.Create<int>(i =>
    {
        System.Threading.Thread.Sleep(2000);
        i.OnNext(1);
        i.OnCompleted();
        return () => { };
    });
var observable2 = Observable.Create<int>(i =>
{
    System.Threading.Thread.Sleep(4000);
    i.OnNext(1);
    i.OnCompleted();
    return () => { };
});

var m = observable1.Zip(observable2, (a, b) => new { a, b });
m.Subscribe(Console.WriteLine);

Если пройти этот шаг за шагом, становится очевидным, в чем проблема.

  1. Мы объявляем 2 наблюдаемые последовательности с помощью Createфабричный метод
  2. Мы составляем 2 последовательности с помощью метода Zip
  3. Мы подписываемся на составную последовательность (m)
  4. Затем вызывается делегат, предоставленный методу Create дляobservable1.
  5. Мы вступаем в делегат и немедленно спим в течение 2 секунд.Обратите внимание, что ни в коем случае мы не меняли темы здесь.Размещенный код является однопоточным.
  6. Мы продолжаем в делегате и OnNext значение 1, затем завершаем последовательность
  7. Все еще в том же потоке (потому что это однопоточный), затем мы подписываемся на Observable2 и вступаем в егоделегат
  8. Мы спим в течение 4 секунд
  9. Мы в следующем 1. Это передается оператору Zip, который ждал, когда 2-я последовательность выдаст значение.
  10. Вызывается функция zip resultSelector и создается тип Anon a = 1, b = 1 и отправляется на консоль (с помощью метода Subscribe)
  11. Последовательность завершается.

Хорошо, так что ясно, что никогда не сможет работать.Но, к счастью, это всего лишь пример, который @foson использует для описания своего вопроса.По иронии судьбы, если бы они использовали FromAsyncPattern, это внесло бы некоторый параллелизм в их код, и это сработало бы.

Правильный способ демонстрации Zip, работающего с задержкой 2 с и 4 с, - это одновременное выполнение.Вы можете сделать это, запланировав OnNext в текущем потоке, или используя для этого другой поток / таймер.

var observable1 = Observable.Timer(TimeSpan.FromSeconds(2));
var observable2 = Observable.Timer(TimeSpan.FromSeconds(4));

var zipped = observable1.Zip(observable2, (a, b) => new { a, b });
zipped.Subscribe(Console.WriteLine);

Здесь мы используем удобную фабрику Observable.Timer.Он создаст последовательность, которая публикует значение 0 в указанный период с момента подписки, а затем завершается.Если у вас есть предпочтения относительно того, как следует планировать таймер, вы можете также указать дополнительный планировщик, например

var observable1 = Observable.Timer(TimeSpan.FromSeconds(2), Scheduler.ThreadPool);
var observable2 = Observable.Timer(TimeSpan.FromSeconds(4), Scheduler.TaskPool);

Планировщик по умолчанию (на момент написания v.1.0.10621.0 для библиотеки .NET 4.0)это Scheduler.ThreadPool.

Вы можете узнать больше о планировании в моем вступлении к серии Rx:

http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html В частности, пост планирования и потоков

http://leecampbell.blogspot.com/2010/06/rx-part-6-scheduling-and-threading.html

Надеюсь, это прояснит проблему с оригинальным постом.

0 голосов
/ 13 июля 2011

Observable.Create не является правильной моделью для Observable.FromAsyncPattern; Zip, Merge и When предложенные решения будут работать для Observable.FromAsyncPattern.

Observable.FromAsyncPattern добавляет параллелизм (либо из базового вызова BeginXXX, либо с помощью AsyncSubject (Scheduler.ThreadPool)), тогда как Observable.Create будет по умолчанию планировать текущий поток.

Лучшей моделью для Observable.FromAsyncPattern будет:

var observable1 = Observable.Create<int>(i =>
{
    return Scheduler.ThreadPool.Schedule(() =>
    {
        Console.WriteLine("starting 1");
        System.Threading.Thread.Sleep(4000);
        Console.WriteLine("done sleeping 1");
        i.OnNext(1);
        i.OnCompleted();
    });
});
0 голосов
/ 13 июля 2011

Zip был правильным ответом:

Observable.Zip(someWebService(), otherWebService(), (some, other) => new { some, other })
    .Subscribe(someAndOther => {
        Console.WriteLine(Some is {0}, and Other is {1}", someAndOther.some, someAndOther.other);
    });

Предполагая, что someWebService - это метод, подпись которого выглядит следующим образом:

IObservable<SomeClass> someWebService()

Если Zip не делает то, что вы хотите,это проблема с тем, как вы вызываете веб-сервис ...

Кроме того, для общего случая, если вы хотите знать, когда завершается куча наблюдаемых, вы можете подделать это с помощью Aggregate:

Observable.Merge(
    observable1.Select(_ => Unit.Default),
    observable2.Select(_ => Unit.Default),
    observable3.Select(_ => Unit.Default),
    observable4.Select(_ => Unit.Default))
.Aggregate(Unit.Default, (acc, _) => acc)
.Subscribe(_ => Console.WriteLine("They all finished!");

Затем вы можете бросить First () в конец вместо Subscribe, если вы хотите заблокировать.

...