Я наткнулся на этот вопрос, когда искал что-то еще.Кажется, есть некоторая путаница относительно того, почему подписка на observable2 не осуществляется одновременно.Ответ прост: Rx не является многопоточным по умолчанию.Это зависит от вас, чтобы управлять потоками / планированием, и это поможет с параллелизмом.@ Энигмативность ускользает от этого, но я подумал, что это требует более глубокого объяснения.
Чтобы быть конкретным;у нас есть следующий (обобщенный) код
var observable1 = Observable.Create<int>(i =>
{
System.Threading.Thread.Sleep(2000);
i.OnNext(1);
i.OnCompleted();
return () => { };
});
var observable2 = Observable.Create<int>(i =>
{
System.Threading.Thread.Sleep(4000);
i.OnNext(1);
i.OnCompleted();
return () => { };
});
var m = observable1.Zip(observable2, (a, b) => new { a, b });
m.Subscribe(Console.WriteLine);
Если пройти этот шаг за шагом, становится очевидным, в чем проблема.
- Мы объявляем 2 наблюдаемые последовательности с помощью Createфабричный метод
- Мы составляем 2 последовательности с помощью метода Zip
- Мы подписываемся на составную последовательность (m)
- Затем вызывается делегат, предоставленный методу Create дляobservable1.
- Мы вступаем в делегат и немедленно спим в течение 2 секунд.Обратите внимание, что ни в коем случае мы не меняли темы здесь.Размещенный код является однопоточным.
- Мы продолжаем в делегате и OnNext значение 1, затем завершаем последовательность
- Все еще в том же потоке (потому что это однопоточный), затем мы подписываемся на Observable2 и вступаем в егоделегат
- Мы спим в течение 4 секунд
- Мы в следующем 1. Это передается оператору Zip, который ждал, когда 2-я последовательность выдаст значение.
- Вызывается функция zip resultSelector и создается тип Anon a = 1, b = 1 и отправляется на консоль (с помощью метода Subscribe)
- Последовательность завершается.
Хорошо, так что ясно, что никогда не сможет работать.Но, к счастью, это всего лишь пример, который @foson использует для описания своего вопроса.По иронии судьбы, если бы они использовали FromAsyncPattern, это внесло бы некоторый параллелизм в их код, и это сработало бы.
Правильный способ демонстрации Zip, работающего с задержкой 2 с и 4 с, - это одновременное выполнение.Вы можете сделать это, запланировав OnNext в текущем потоке, или используя для этого другой поток / таймер.
var observable1 = Observable.Timer(TimeSpan.FromSeconds(2));
var observable2 = Observable.Timer(TimeSpan.FromSeconds(4));
var zipped = observable1.Zip(observable2, (a, b) => new { a, b });
zipped.Subscribe(Console.WriteLine);
Здесь мы используем удобную фабрику Observable.Timer.Он создаст последовательность, которая публикует значение 0 в указанный период с момента подписки, а затем завершается.Если у вас есть предпочтения относительно того, как следует планировать таймер, вы можете также указать дополнительный планировщик, например
var observable1 = Observable.Timer(TimeSpan.FromSeconds(2), Scheduler.ThreadPool);
var observable2 = Observable.Timer(TimeSpan.FromSeconds(4), Scheduler.TaskPool);
Планировщик по умолчанию (на момент написания v.1.0.10621.0 для библиотеки .NET 4.0)это Scheduler.ThreadPool.
Вы можете узнать больше о планировании в моем вступлении к серии Rx:
http://leecampbell.blogspot.com/2010/08/reactive-extensions-for-net.html В частности, пост планирования и потоков
http://leecampbell.blogspot.com/2010/06/rx-part-6-scheduling-and-threading.html
Надеюсь, это прояснит проблему с оригинальным постом.