Почему rxjs-fetch закрывает Rx GroupObservable? - PullRequest
0 голосов
/ 20 февраля 2019

Когда я использую rxjs-fetch для добавления информации в GroupObservable, она исчерпывается.(дальнейшее отображение работает на пустой Observable)

Пусть код говорит:

import Rx from "rxjs";
import rxFetch from 'rxjs-fetch'
import chai from "chai";

const expect = chai.expect;

describe('General', () => {
  it('combine group and rxfetch', (done)=>{
    Rx.Observable.from([1,2,3,4,5,6])
        .groupBy(v=>v%2)
        .do(modGroup => console.log(modGroup.key, modGroup.groupSubject.isStopped))
        .flatMap(modGroup => modGroup.reduce((a,b) => a+b, 0))
        .do(console.log)
        .subscribe(()=>{},done,done);
  })
});

Это выводит как ожидалось:

1 false
0 false
9
12

Теперь добавим несколько rxjs-fetch,это не должно причинить никакого вреда:

import Rx from "rxjs";
import rxFetch from 'rxjs-fetch'
import chai from "chai";

const expect = chai.expect;

describe('General', () => {
  it('combine group and rxfetch', (done)=>{
    Rx.Observable.from([1,2,3,4,5,6])
        .groupBy(v=>v%2)
        .do(modGroup => console.log(modGroup.key, modGroup.groupSubject.isStopped))
        .flatMap(modGroup =>
          rxFetch("https://my-json-server.typicode.com/typicode/demo/posts")
              .json().map(r => modGroup)
        )
        .do(modGroup => console.log(modGroup.key, modGroup.groupSubject.isStopped))
        .flatMap(modGroup => modGroup.reduce((a,b) => a+b, 0))
        .do(console.log)
        .subscribe(()=>{},done,done);
  })
});

Но удивить вывод:

1 false
0 false
1 true
0 true
0
0

Два true указывают, что GroupObservable остановлен и не будет выдавать никаких дальнейших значений,Все, что я сделал, это извлек что-то через rxjs-fetch.

В моем примере из реального мира ключ группы становится частью URL-адреса запроса, и отображение немного более изящное, я сократил его до минимально возможного сбоя.пример.

Любая идея, как я могу добиться rxjs-fetch без потери groupObservable?

1 Ответ

0 голосов
/ 20 февраля 2019

Кажется, что причина 0, 0 и закрытых Observables следующая:

Предположим, rxFetch для получения значения требуется 5сек .В то время как источник .from([...]) занимает 0сек .Теперь, когда вы map rxFetch ответите на groupMod - вы отобразите его 5сек после того, как .from закончится.Итак, ваши начальные наблюдаемые группы завершены и не имеют значений.

Вот своего рода мраморная диаграмма:

.из:

1
2|
3

rxFetch

`----R|

результат

`-----|

      ^ here we resubscribe to source groups

Вот небольшой пример, иллюстрирующий, что https://observable -playground.github.io / gist / 43d54c2c60acce4b580ed91d84f51789

Попробуйте оба .from и .timer примеров, а также попробуйте увеличить время асинхронной задачи.

(также есть предложенное решение, см. ниже)

Как решить эту проблему.Ну, не очень понятно, каковы ваши большие намерения, поэтому я предполагаю, что вам нужен ответ rxFetch вместе с сгруппированными значениями - тогда вы можете использовать mergeMap, чтобы сделать запрос для каждого значения

.flatMap(g =>
  g
    .mergeMap(v => rxFetch('https://example.com/' + g.key), (a,b)=> a + b)
)

Ключевой идеей здесь является то, что мы работаем с группами, которые объединены с rxFetch и ждут ответа;вместо непосредственного возврата завершенных групп.

Надеюсь, это поможет

...