В RX как объединить два источника разных типов - PullRequest
3 голосов
/ 21 января 2012

Настройка:

  • Первый IObservable производит значения типа A
  • Second IObservable создает значения типа B
  • Они производят ценность в разном темпе (довольно быстро, каждые 10 мс)

Чего я пытаюсь достичь:

Каждый N раз (N довольно медленный, около 500 мс), вызов должен быть сделан на службу и предоставлять последние значения как Первый и Второй IObservable.

Вопрос: Интересно, как я могу это сделать, используя RX.

Текущее решение (не работает) :

var stateObs = from drag in dragObs.MostRecent(0).ToObservable()
                from roll in rollObs.MostRecent(0).ToObservable()
                select new ClientState
                            {
                                FileDragPerc = drag,
                                PhoneRoll = roll,
                                PendingFileType = FileType.Image,
                                TransferState = TransferState.SelectiveTransfer
                            };

stateObs.Sample(TimeSpan.FromMilliseconds(300))
        .Subscribe(x => _lsService.SetClientStateAsync(x),
                    x => Debug.WriteLine("Error in observable "),
                    () => Debug.WriteLine("Error observable finished! "));

Ответы [ 2 ]

2 голосов
/ 21 января 2012

Вы правы. Вот что делает оператор CombineLatest:

A: 1...2...3...4...5...

B: a.....b.........c...

Последнее значение любой последовательности сохраняется для создания пары, на которую действует селектор. Выходной поток будет (1,a) (2,a) (2,b) (3,b) и т. Д.

Если вам нужно создать правильные пары из любого потока, используйте оператор 'Zip', который даст вам (1,a) (2,b) (3,c) и т. Д.

приписка

Я бы предложил попытаться лучше понять, как компилятор переписывает понимание запросов. Это разрешит большую часть вашей путаницы.

from a in oA
from b in oB
select ...

эффективно SelectMany(oA, oB)

1 голос
/ 21 января 2012

Я думаю, что нашел ответ сам

Операция CombineLatest () делает то, что мне нужно, Вот что я получаю:

var stateObs = dragObs.CombineLatest(rollObs, (d, r) => new ClientState
                                                       {
                                                           FileDragPerc = d,
                                                           PhoneRoll = r,
                                                           TransferState = TransferState.SelectiveTransfer,
                                                           PendingFileType = FileType.Image
                                                       });

    stateObs.Sample(TimeSpan.FromMilliseconds(300))
            .Subscribe(x => _lsService.SetClientStateAsync(x),
                        x => Debug.WriteLine("Error in observable "),
                        () => Debug.WriteLine("Error observable finished! "));
...