rxjs - прекратить наблюдаемое с тематикой подписки - PullRequest
0 голосов
/ 11 декабря 2018

Я создал Observable, генерирующий потенциально бесконечный объем данных (например, таймер).Доступ к этим данным осуществляется через субъект, поэтому несколько наблюдателей получили бы одинаковые значения.

Как остановить Наблюдение, генерирующее новые значения?(без изменения реализации Observable)

// custom Observable, to visualize internal behavior
const timer$ = new rxjs.Observable(subscriber => {
  console.log("observable init");

  var counter = 0;
  const intervalId = setInterval(() => {
    ++counter;
    console.log("observable %s",counter);
    subscriber.next(counter);
  }, 1000);

  return () => {
    console.log("observable teardown");
    clearTimeout(intervalId);
  }
});

// subscribe through a subject
const subject$ = new rxjs.Subject();
timer$.subscribe(subject$);

const subscription = subject$.subscribe(value => console.log("observer %s", value));

// cancel subscription
setTimeout(() => {
  console.log("unsubscribe observer");
  subscription.unsubscribe();
  // TODO how to stop Observable generating new values?
}, 3000);

jsfiddle: https://jsfiddle.net/gy4tfd5w/

Ответы [ 2 ]

0 голосов
/ 15 февраля 2019

Итак, после некоторого исследования НАКАЗАНИЯ я добавил собственную библиотеку npm для этой проблемы.

Improves previous answer by NOT having to add any extra convolution variables and ease of use.

enter image description here

0 голосов
/ 11 декабря 2018

К счастью, в RxJS есть специальный и элегантный способ решения этой проблемы.

Вам необходимо, чтобы

несколько наблюдателей [...] получали одинаковые значения

Это называется многоадресной наблюдаемой, и есть определенные операторы , которые используются для создания одного из обычных "холодных" наблюдаемых.

Например, вместо того, чтобы создавать экземпляр Subject напрямую, вы можете просто передать наблюдаемый источник оператору share, и он создаст для вас Subject.Документация share гласит:

Возвращает новый объект Observable, который группирует (разделяет) исходный объект Observable.До тех пор, пока существует хотя бы один подписчик, эта наблюдаемая будет подписана и будет передавать данные.Когда все подписчики отписались, он отписывается от источника Observable.

Последнее предложение показывает небольшую разницу между share и source$.subscribe(subject)share сохраняется так называемое refCount, которое автоматически отписывает Subject от его источника, когда на нем не осталось подписчиков.

Применительно к вашему коду это выглядит так:

const timer$ = 
    new rxjs.Observable(subscriber => {// your unchanged implementation})
    .pipe(
        share()
    );

const subscription = timer$.subscribe(value => console.log("observer %s", value));

Вот полная версия с примером кода:

https://jsfiddle.net/50q746ad/

Кстати, share - не единственный оператор, выполняющий многоадресную передачу. Существуют отличные учебные ресурсы , которые намного углубляются в эту тему.

...