Выполнить функцию по наблюдаемой отмене - PullRequest
2 голосов
/ 04 мая 2019

Я хочу заметить, что при отписке он вызывает функцию, но только тогда, когда отписался без ошибок и без завершения. Наблюдаемое, которое я пытаюсь построить, обычно сопровождается другим наблюдаемым. Я хочу, когда другой наблюдаемый "выигрывает", этот выполняет функцию.

Я пытался завершить оператор, но он выполняется всегда.

playback.ts

import { timer } from "rxjs";
import { takeUntil, finalize } from "rxjs/operators";
import errorobs$ from "./errorobs";

export default function() {
  return timer(10000).pipe(
    takeUntil(errorobs$),
    finalize(finalFunc)
  );
}

function finalFunc() {
  console.log("final function executed");
}

errorobs.ts

import { fromEvent } from "rxjs";
import { map } from "rxjs/operators";

export default fromEvent(document.getElementById("errorBtn"), "click").pipe(
  map(() => {
    throw new Error("my error");
  })
);

Я сделал небольшое демо здесь https://codesandbox.io/s/q7pwowm4l6

нажмите кнопку Пуск, чтобы начать «наблюдаемое».

нажмите "Отмена", чтобы получить другой наблюдаемый выигрыш

нажмите на ошибку, чтобы создать ошибку

Ответы [ 2 ]

1 голос
/ 04 мая 2019

Это действительно работает, как вы это описали.finalize выполняется при удалении цепочки, когда все подписчики отписываются, когда происходит ошибка цепочки или когда она завершается.

На странице RxJS Github уже есть проблема для этой функции: https://github.com/ReactiveX/rxjs/issues/2823

В ссылке выше вы можете увидеть пример пользовательского оператора, который добавляет причину к оператору finalize.

Мне пришлось самому разобраться с этим вариантом использования и добавить этот оператор в мою собственную коллекцию операторов RxJS: https://github.com/martinsik/rxjs-extra/blob/master/doc/finalizeWithReason.md

1 голос
/ 04 мая 2019

Один из способов добиться этого - использовать пользовательский оператор, как мой onCancel() ниже:

const {Observable} = rxjs

function onCancel(f) {
  return observable => new Observable(observer => {
    let completed = false
    let errored = false
    const subscription = observable.subscribe({
      next: v => observer.next(v),
      error: e => {
        errored = true
        observer.error(e)
      },
      complete: () => {
        completed = true
        observer.complete()
      }
    })
    return () => {
      subscription.unsubscribe()
      if (!completed && !errored) f()
    }
  })
}

// Test:
const {interval} = rxjs
const {take} = rxjs.operators

// This one gets cancelled:
const s = interval(200).pipe(
  onCancel(() => console.warn('s cancelled!'))
).subscribe(() => {})
setTimeout(() => s.unsubscribe(), 500) 

// This one completes before unsubscribe():
const q = interval(200).pipe(
  take(2),
  onCancel(() => console.warn('q cancelled!'))
).subscribe(() => {})
setTimeout(() => q.unsubscribe(), 500)
<script src="//unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
...