Поспешная альтернатива forkjoin rx js для заметного сцепления? - PullRequest
6 голосов
/ 08 января 2020

У меня есть 5 различных вызовов API, и все они прямо сейчас связаны в forkJoin. Мое новое требование заключается в том, что подписка должна запускаться в любое время при любых новых наблюдаемых решениях.

Есть ли в rx js какой-либо оператор или какой-либо другой трюк, в котором я могу сохранить цепочку, но он должен срабатывать каждый раз, когда обнаруживаются какие-либо наблюдаемые решения?

forkJoin(
        this.myService.api1(),
        this.myService.api2(),
        this.myService.api3(),
        this.myService.api4(),
        this.myService.api5()
    )
        .subscribe(
            ([r1,r2,r3,r4,r5]) => { ... do something })

Ответы [ 2 ]

1 голос
/ 08 января 2020

Вы можете использовать combineLatest для выдачи последнего значения из каждого наблюдаемого источника. Он не будет излучаться до тех пор, пока каждый наблюдаемый источник не издаст хотя бы один раз, поэтому вы можете использовать startWith для предоставления начального значения:

combineLatest(
        this.myService.api1().pipe(startWith(null)),
        this.myService.api2().pipe(startWith(null)),
        this.myService.api3().pipe(startWith(null)),
        this.myService.api4().pipe(startWith(null)),
        this.myService.api5().pipe(startWith(null))
    )
        .subscribe(
            ([r1,r2,r3,r4,r5]) => { ... do something })

Начальный вывод будет [null, null, null, null, null]. Когда каждое наблюдаемое излучает, оно заменит соответствующее значение null в массиве.

Если вы хотите игнорировать начальное излучение, вы можете использовать skip(1).

const sourceOne = of('Hello').pipe(delay(1000));
const sourceTwo = of('World!').pipe(delay(2000));
const sourceThree = of('Goodbye').pipe(delay(3000));
const sourceFour = of('World!').pipe(delay(4000));

//wait until all observables have emitted a value then emit all as an array
const example = combineLatest(
  sourceOne.pipe(startWith(null)),
  sourceTwo.pipe(startWith(null)),
  sourceThree.pipe(startWith(null)),
  sourceFour.pipe(startWith(null))
)
.pipe(skip(1));

//output:
//["Hello", null, null, null]
//["Hello", "World!", null null]
//["Hello", "World!", "Goodbye", null]
//["Hello", "World!", "Goodbye", "World!"]
//Complete
const subscribe = example.subscribe(val => console.log(val), null, () => console.log('Complete'));

И вот StackBlitz , чтобы попробовать его.

1 голос
/ 08 января 2020

Вы можете использовать merge для одновременного выполнения ваших наблюдаемых объектов, таких как forkJoin, но немедленно отправлять их значения. Для отслеживания порядка добавьте индекс наблюдаемого к его выводу с помощью map. Используйте scan для отслеживания предыдущих значений, вставьте текущие значения в правильную позицию в массиве и отправьте накопленные данные.

export function forkJoinEarly(...sources: Observable<any>[]): Observable<any[]> {
  return merge(...sources.map((obs, index) => obs.pipe(
    // optional: only emit last value like forkJoin
    last(), 
    // add the index of the observable to the output
    map(value => ({ index, value })) 
  ))).pipe(
    // use scan to keep track of previous values and insert current values
    scan((acc, curr) => (acc[curr.index] = curr.value, acc), Array(sources.length).fill(undefined))
  );
}

https://stackblitz.com/edit/rxjs-gwch8m

...