Есть ли способ завершить поток после пропуска окончательного значения? - PullRequest
0 голосов
/ 19 января 2019

У меня есть исходный поток, который не завершается, но мне нужен оператор, который завершит работу наблюдателей после того, как полученное значение удовлетворяет некоторым требованиям, чтобы сделать его последним элементом для наблюдателей. На данный момент мой код просто использует takeUntil() для наблюдаемой, которая генерируется, когда значение из источника соответствует критериям.

const source: Observable<any> = nonCompletingStream();
const conditionSatisfied: Observable<boolean> = source.pipe(
    map(val => meetsCriteriaToBeFinalValue(val)),
    skipWhile(meetsCriteria => meetsCriteria === false),
    shareReplay(),  // so that this emits after the fact
    take(1) // only needs to know if the criteria is met once
);
const example: Observable<any> = source.pipe(
    takeUntil(conditionSatisfied)
);

example.subscribe(
    val => { // does stuff with individual values },
    err => { // handles error },
    () => { // this gets triggered one item too early }
);

Проблема с этим заключается в том, что, когда проходит элемент, соответствующий критериям (вызывающий conditionSatisfied), наблюдатель на example завершает работу до того, как «последнее значение» сможет перейти к обратному вызову next. Есть ли способ пропустить окончательное значение до наблюдателя перед завершением потока?

При чтении параметров expand выглядит примерно так, как мне нужно, позволяя значениям проходить до того, как рекурсивно отправлять новые наблюдаемые в поток. Однако, хотя он может добавлять значения в поток, не похоже, что он может завершить поток.

РЕДАКТИРОВАТЬ: изменил название вопроса, чтобы быть более общим

Ответы [ 2 ]

0 голосов
/ 19 января 2019

Изменяя ваш поток на горячее наблюдаемое, вы можете напрямую управлять его жизненным циклом.

const source = nonCompletingStream().pipe(publish());

Запустите подписку, подключив наблюдаемый источник:

const example = source.subscribe(...);
const subscription = source.connect();

И отмените подписку, отменив подписку на наблюдаемый источник:

source.pipe(filter(meetsCriteriaToBeFinalValue)).subscribe(() => subscription.unsubscribe());

Живой пример:

const { interval } = rxjs;
const { publish, filter } = rxjs.operators;

const source$ = interval(100).pipe(publish());
const stopCriteria = x => x === 3;

const example$ = source$.subscribe(console.log);

const subscription = source$.connect();

source$.pipe(
  filter(stopCriteria)
).subscribe(() => subscription.unsubscribe());
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.3.3/rxjs.umd.js"></script>
0 голосов
/ 19 января 2019

Вы можете conditionSatisfied выдавать исходное значение вместо логического значения, а затем объединять его с наблюдаемой, которая завершается на основе условия:

const source = nonCompletingStream();
const conditionSatisfied = source.pipe(
    filter(val => meetsCriteriaToBeFinalValue(val)),
    shareReplay(),
    take(1)
);
const example = concat(
  source.pipe(
      takeUntil(conditionSatisfied)
  ),
  conditionSatisfied
)

Здесь продемонстрировано https://stackblitz.com/edit/rxjs-qydhh3

...