У меня есть исходный поток, который не завершается, но мне нужен оператор, который завершит работу наблюдателей после того, как полученное значение удовлетворяет некоторым требованиям, чтобы сделать его последним элементом для наблюдателей. На данный момент мой код просто использует takeUntil()
для наблюдаемой, которая генерируется, когда значение из источника соответствует критериям.
const source: Observable<any> = nonCompletingStream();
const conditionSatisfied: Observable<boolean> = source.pipe(
map(val => meetsCriteriaToBeFinalValue(val)),
skipWhile(meetsCriteria => meetsCriteria === false),
shareReplay(), // so that this emits after the fact
take(1) // only needs to know if the criteria is met once
);
const example: Observable<any> = source.pipe(
takeUntil(conditionSatisfied)
);
example.subscribe(
val => { // does stuff with individual values },
err => { // handles error },
() => { // this gets triggered one item too early }
);
Проблема с этим заключается в том, что, когда проходит элемент, соответствующий критериям (вызывающий conditionSatisfied
), наблюдатель на example
завершает работу до того, как «последнее значение» сможет перейти к обратному вызову next
. Есть ли способ пропустить окончательное значение до наблюдателя перед завершением потока?
При чтении параметров expand
выглядит примерно так, как мне нужно, позволяя значениям проходить до того, как рекурсивно отправлять новые наблюдаемые в поток. Однако, хотя он может добавлять значения в поток, не похоже, что он может завершить поток.
РЕДАКТИРОВАТЬ: изменил название вопроса, чтобы быть более общим