Я разработал стратегию, отличную от Cartant, и явно гораздо менее элегантную, что может дать вам несколько иной результат.Я прошу прощения, если я не понял вопроса и если мой ответ окажется бесполезным.
Моя стратегия основана на использовании switchMap
на a $ и затем bufferTime
на b $ .
Этот код генерируется при каждом timeInterval
и генерирует объект, который содержит последний a полученный и массив b s, представляющие b s, полученные в течение интервала времени.
a$.pipe(
switchMap(a => {
return b$.pipe(
bufferTime(timeInterval),
mergeMap(arrayOfB => of({a, arrayOfB})),
)
})
)
Если arrayOfB
пусто, это означает, что последние a в несоответствии.
Если arrayOfB
имеет только один элемент, это означает, что последнему a соответствует b массива.
ЕслиarrayOfB
имеет более одного элемента, это означает, что последнему a соответствует первый b массива, в то время как все остальные b s
Теперь нужно избегать выброса одного и того же a более одного раза, и здесь код становится немного грязным.
В итоге,TКод может выглядеть следующим образом
const a$ = new Subject();
const b$ = new Subject();
setTimeout(() => a$.next("a1"), 0);
setTimeout(() => b$.next("b1"), 0);
setTimeout(() => a$.next("a2"), 100);
setTimeout(() => b$.next("b2"), 125);
setTimeout(() => a$.next("a3"), 200);
setTimeout(() => b$.next("b3"), 275);
setTimeout(() => a$.next("a4"), 400);
setTimeout(() => b$.next("b4"), 425);
setTimeout(() => b$.next("b4.1"), 435);
setTimeout(() => a$.next("a5"), 500);
setTimeout(() => b$.next("b5"), 575);
setTimeout(() => b$.next("b6"), 700);
setTimeout(() => b$.next("b6.1"), 701);
setTimeout(() => b$.next("b6.2"), 702);
setTimeout(() => a$.next("a6"), 800);
setTimeout(() => a$.complete(), 1000);
setTimeout(() => b$.complete(), 1000);
let currentA;
a$.pipe(
switchMap(a => {
currentA = a;
return b$.pipe(
bufferTime(50),
mergeMap(arrayOfB => {
let aVal = currentA ? currentA : null;
if (arrayOfB.length === 0) {
const ret = of({a: aVal, b: null})
currentA = null;
return ret;
}
if (arrayOfB.length === 1) {
const ret = of({a: aVal, b: arrayOfB[0]})
currentA = null;
return ret;
}
const ret = from(arrayOfB)
.pipe(
map((b, _indexB) => {
aVal = _indexB > 0 ? null : aVal;
return {a: aVal, b}
})
)
currentA = null;
return ret;
}),
filter(data => data.a !== null || data.b !== null)
)
})
)
.subscribe(console.log);