Предположим, я хочу, чтобы наблюдаемая периодически генерировала значение до тех пор, пока не появится другая наблюдаемая. Так что я могу использовать timer
и takeUntil
, чтобы добиться этого. Но затем я хочу обработать каждое испущенное значение и прекратить (ошибку) испускание, когда какое-либо условие становится истинным. Поэтому я пишу следующий фрагмент кода:
const { timer, Subject } = 'rxjs';
const { takeUntil, map } = 'rxjs/operators';
const s = new Subject();
let count = 0;
function processItem(item) {
count++;
return count < 3;
}
const value = "MyValue";
timer(0, 1000).pipe(
takeUntil(s),
map(() => {
if (processItem(value)) {
console.log(`Processing done`);
return value;
} else {
s.next(true);
s.complete();
console.error(`Processing error`);
throw new Error(`Stop pipe`);
}
}),
)
Детская площадка
Но вместо того, чтобы получить ошибку, я завершил свой Observable. Только если я закомментирую оператор takeUntil(s)
, я получаю ошибку. Похоже, когда оператор pipe завершает работу, его значение не выдается сразу, а запоминается и выдается в конце следующей «итерации» канала, затем заменяется новым результатом и так далее. И в моей ситуации следующая итерация, когда должна появиться ошибка, предотвращается takeUntil
. Вопрос в том, правильно ли я с этим предположением, и если да, то почему rxjs разработан таким образом?