Реактивное расширение rx: как заставить каждого подписчика получить различное значение (следующее) из наблюдаемой? - PullRequest
2 голосов
/ 02 февраля 2012

Используя реактивное расширение, легко подписаться 2 раза на одну и ту же наблюдаемую.Когда новое значение доступно в наблюдаемой, оба подписчика вызываются с одинаковым значением.

Есть ли способ, чтобы каждый подписчик получил другое значение (следующее) из этой наблюдаемой?

Из того, что я ищу:
исходная последовательность: [1,2,3,4,5, ...] (бесконечно)
Источник постоянно добавляет новые предметы с неизвестной скоростью.
Я пытаюсь выполнить длительное асинхронное действие для каждого элемента, используя N подписчиков.

1-й подписчик: 1,2,4, ...
2-й подписчик: 3,5, ...
...
или
1-й подписчик: 1,3, ...
2-й подписчик: 2,4,5, ...
...
или
1-й подписчик: 1,3,5, ...
2-й подписчик:2,4,6, ...

Ответы [ 2 ]

1 голос
/ 08 февраля 2012

Как насчет:

IObservable<TRet> SomeLengthyOperation(T input) 
{
    return Observable.Defer(() => Observable.Start(() => {
        return someCalculatedValueThatTookALongTime;
    }, Scheduler.TaskPoolScheduler));
}

someObservableSource
    .SelectMany(x => SomeLengthyOperation(input))
    .Subscribe(x => Console.WriteLine("The result was {0}", x);

Вы даже можете ограничить количество одновременных операций:

someObservableSource
    .Select(x => SomeLengthyOperation(input))
    .Merge(4 /* at a time */)
    .Subscribe(x => Console.WriteLine("The result was {0}", x);

Для работы слияния (4) важно, чтобы Observable возвращалSomeLengthyOperation будет Cold Observable, что и делает отсрочку - это делает Observable.Start невозможным, пока кто-то не подпишется.

1 голос
/ 06 февраля 2012

Я бы согласился с Асти.

Вы можете использовать Rx, чтобы заполнить очередь (блокирующую коллекцию), а затем сделать так, чтобы конкурирующие потребители читали из очереди.Таким образом, если один процесс по какой-то причине был быстрее, он мог бы забрать следующий элемент потенциально перед другим потребителем, если бы он все еще был занят.

Однако, если вы хотите сделать это, вопреки добрым советам :), тогдаВы можете просто использовать оператор Select, который предоставит вам индекс каждого элемента.Затем вы можете передать это своим подписчикам, и они могут соответствовать модулю.(Блин! Утечка абстракций, магические числа, потенциальная блокировка, потенциальные побочные эффекты для исходной последовательности и т. Д.)

var source = Obserservable.Interval(1.Seconds())
  .Select((i,element)=>{new Index=i, Element=element});

var subscription1 = source.Where(x=>x.Index%2==0).Subscribe(x=>DoWithThing1(x.Element));
var subscription2 = source.Where(x=>x.Index%2==1).Subscribe(x=>DoWithThing2(x.Element));

Также помните, что работа, выполняемая с обработчиком OnNext, если он блокирует, все равно будет блокировать планировщикэто находится на.Это может повлиять на скорость вашего источника / производителя.Еще одна причина, почему ответ Асти является лучшим вариантом.

Спросите, если это не ясно: -)

...