Для протокола, я выкладываю решение, альтернативное принятому ответу.
ПРИМЕЧАНИЕ: для конечных потоков concat(of(1))
требуется до scan()
, для distinctUntilChanged
для испускания последней наблюдаемой.
const {of, from, ReplaySubject} = rxjs;
const {map, concat, scan, distinctUntilChanged, concatAll, toArray, delay} = rxjs.operators;
//considering infinite stream
const stream = from([1,2,3,1,2,1,1,1,2]);
stream.pipe(
scan( (acc, val) => {
if(val===1){
acc.complete()
acc=new ReplaySubject();
}
acc.next(val);
return acc;
}, new ReplaySubject()),
distinctUntilChanged(),
map(toArray()),
concatAll() );
Было бы неплохо собрать отзывы о предпочтительном решении.