Это сложно, но я думаю, что это выполнимо, комбинируя concatMap
и takeUntil
. Я оставил несколько журналов, чтобы сделать более понятным, как все это работает.
const randomStream$ = defer(() => of(null).pipe(
delay(Math.random() * 10000),
)).pipe(take(1), repeat());
let hIndex = 0;
let lIndex = 0;
const high$ = randomStream$.pipe(map(v => `H ${hIndex++}`));
const low$ = randomStream$.pipe(map(v => `L ${lIndex++}`));
merge(high$, low$).pipe(
concatMap(v => {
const sound$ = of(v).pipe(
delay(5000),
);
if (v[0] === 'H') { // Item from the high priority stream
return sound$;
} else {
return sound$.pipe(
takeUntil(high$.pipe(
tap(() => console.log(`${v} canceled`))
)),
);
}
}),
).subscribe(v => console.log(`${v} done`));
Демонстрационная версия: https://stackblitz.com/edit/rxjs-jdbkhb
Таким образом, все излучения от high$
и low$
превращаются в sound$
, что занимает 5 с. Затем логика разбивается на разные типы потоков:
Поток с высоким приоритетом только что вернулся, и благодаря concatMap
действительно требуется 5 с для sound$
завершения.
Поток с низким приоритетом связан с takeUntil
, поэтому, когда выдается еще один high$
, он немедленно завершается, и окружающий concatMap
начинает обработку другого. Если приходит low$
, то он ставится в очередь на concatMap
.
Кстати, я не знаю, хотите ли вы ставить в очередь выбросы из low$
и high$
, потому что если у вас есть несколько low$
буферизованных, то они будут обрабатываться в том же порядке, в котором они поступили вместе с выбросами от high$
.
Так что, возможно, вы захотите игнорировать все выбросы с low$
, в то время как есть ожидающие выбросы с high$
, но для этого потребуется, я думаю, один побочный эффект (переменный вне цепочки), который будет подсчитывать ожидающие выбросы от high$
(или, может быть, просто верните EMPTY
из concatMap
, когда ожидающие ожидающие элементы с высоким приоритетом ожидают?).