Разница между холодной наблюдаемой в RX и нормальной Enumerable - PullRequest
6 голосов
/ 27 августа 2011

Я новичок в Rx.Я вижу некоторые реальные преимущества использования Hot Observables, однако мне недавно был задан вопрос о том, в чем разница между наблюдаемой в холодном режиме и эквивалентным перечислимым (см. Фрагмент кода ниже) ...

    var resRx = Observable.Range(1, 10);
    var resOb = Enumerable.Range(1, 10);

Можеткто-нибудь очень просто объясняет, в чем разница между этими двумя понятиями, и какую выгоду я получу от наблюдаемого холода и перечисляемого.

Ответы [ 2 ]

15 голосов
/ 27 августа 2011

Существует несколько различий между ними.

Кто контролирует «перечисление»

С наблюдаемым (горячим или холодным) именно наблюдаемое определяеткогда и в каком потоке возвращаются значения.С другой стороны, перечислимое получает каждое значение, когда вы запрашиваете его, и обрабатывается в потоке, запрашивающем перечисление.

Поток кода

Обработка перечислимых элементов обычно выполняется вдля каждого цикла (или иногда получить перечислитель и использовать цикл while).Ваш код обычно обрабатывает все значения перед продолжением.Наблюдаемые требуют обратного вызова.Блокирование дальнейшего выполнения вашего кода (скажем, чтобы не выходить из консольного приложения) требует дополнительного кода с вашей стороны.Есть некоторые блокирующие операторы для наблюдаемых, такие как First, но они являются скорее исключением, чем правилом для обычного использования.

Возьмем этот простой пример программы в качестве примера.С перечислимым все значения записываются, прежде чем перейти к следующей части.Однако значения из наблюдаемого не гарантируются, что они когда-либо будут записаны.Сколько значений записывается до завершения программы - это в основном условие гонки.

static void Main(string[] args)
{
    var xs = Enumerable.Range(1, 10);
    foreach (var x in xs)
    {
        Console.WriteLine(x);
    }
    //at this point, all values have been written

    var ys = Observable.Range(1, 10);
    ys.Subscribe(y => Console.WriteLine(y));
    //at this point, no values have been written (in general)

    //either add a Console.ReadKey or some sort of wait handle that
    //is set in the OnCompleted of the observer to get values
}

Асинхронные процессы

Так же, как вы должны написать дополнительный код для блокировки идождитесь наблюдаемого, для написания IEnumerable, который использует асинхронный процесс, требуется дополнительная работа.Вот где разница между ними действительно проявляется.

Например, в приложении, над которым я сейчас работаю, мне нужно искать устройства, которые могут быть подключены к последовательному порту.IObservable хорошо подходит для этого, потому что он позволяет мне принимать обратный вызов и уведомлять приложение для каждого устройства всякий раз, когда я его нахожу, без необходимости блокировать и когда операция завершена.Эта наблюдаемая квалифицируется как холодная наблюдаемая, потому что она не будет выталкивать данные, если нет подписчика, и каждая подписка получает все результаты.(В отличие от типичного холодного наблюдения, я начинаю работу перед подпиской, но данные не теряются, поскольку они буферизируются в тему воспроизведения.) Однако для меня не имеет особого смысла преобразовывать их в Enumerable из-за асинхронногоприрода.

6 голосов
/ 28 августа 2011

Почти все перечисляемые, к которым вы привыкли, являются "холодными" перечислимыми?Зачем?Потому что если бы вы дважды использовали ForEach поверх Enumerable.Range, вы бы получили 2-кратное число.

Если бы Enumerable.Range был Hot Enumerable, он бы выдал вам список только один раз, а 2nd ForEach будет пустым.

Для Rx Cold Observable означает, что каждый раз, когда вы звоните Subscribe (эквивалент ForEach в Rx), вы будете получать новый список вещей.Горячие Observables, такие как FromEvent, не будут давать вам новый поток событий каждый раз, когда вы подписываетесь, это просто будет еще одно соединение с тем же потоком событий.

В чем здесь преимущество Rx?Возможность преобразования этого диапазона в асинхронные запросы:

IObservable<Image> fetchImageByIndex(int imageIndexOnSite);

Observable.Range(0, 10)
    .SelectMany(x => fetchImageByIndex(x)) 
    .Subscribe(image => saveImageToDisk(image));
...