Как я могу отменить подписку или отменить фильтрацию большого массива, который является наблюдаемым RxJS? - PullRequest
0 голосов
/ 22 мая 2018

Насколько я понимаю, весь массив передается подписчику, в отличие от, скажем, наблюдателя интервала, который можно отписать / отменить.

Например, работает следующая отмена ...

// emit a value every second for approx 10 seconds
let obs = Rx.Observable.interval(1000)
  .take(10)
let sub = obs.subscribe(console.log);

// but cancel after approx 4 seconds
setTimeout(() => {
  console.log('cancelling');
  sub.unsubscribe()
}, 4000);
<script src="https://unpkg.com/rxjs@5.5.10/bundles/Rx.min.js"></script>

Однако заменить интервал на массив нельзя.

// emit a range
let largeArray = [...Array(9999).keys()];
let obs = Rx.Observable.from(largeArray)
let sub = obs.subscribe(console.log);

// but cancel after approx 1ms
setTimeout(() => {
  console.log('cancelling');
  sub.unsubscribe()
}, 1);

// ... doesn't cancel
<script src="https://unpkg.com/rxjs@5.5.10/bundles/Rx.min.js"></script>

Необходимо ли каким-либо образом сделать каждый элемент асинхронным, например, поместив его в setTimeout (..., 0)?Возможно, я слишком долго смотрел на эту проблему и совершенно не в курсе, что обработку массива можно отменить?

Ответы [ 2 ]

0 голосов
/ 22 мая 2018

Я отметил правильный ответ @ bygrace.Очень признателен!Как упоминалось в комментарии к его ответу, я публикую пользовательскую реализацию наблюдаемой, которая поддерживает такую ​​отмену для интереса ...

const observable = stream => {
  let timerID;

  return {
    subscribe: observer => {
      timerID = setInterval(() => {
        if (stream.length === 0) {
          observer.complete();
          clearInterval(timerID);
          timerID = undefined;
        }
        else {
          observer.next(stream.shift());
        }
      }, 0);

      return {
        unsubscribe: () => {
          if (timerID) {
            clearInterval(timerID);
            timerID = undefined;
            observer.cancelled();
          }
        }
      }
    }
  }
}

// will count to 9999 in the console ...
let largeArray =  [...Array(9999).keys()];

let obs = observable(largeArray);
let sub = obs.subscribe({
  next: a => console.log(a),
  cancelled: () => console.log('cancelled')
});

// except I cancel it here
setTimeout(sub.unsubscribe, 200);
0 голосов
/ 22 мая 2018

При использовании from(...) в массиве все значения будут передаваться синхронно, что не позволяет предоставлять какое-либо время выполнения для setTimeout, который вы используете для отмены подписки.Фактически, он заканчивает излучение до того, как линия для setTimeout будет достигнута.Чтобы позволить эмиттам не загружать поток, вы можете использовать асинхронный планировщик (from(..., Rx.Scheduler.async)), который будет планировать работу с использованием setInterval.

Вот документы: https://github.com/ReactiveX/rxjs/blob/master/doc/scheduler.md#scheduler-types

Здесьэто бегущий пример.Мне пришлось увеличить время ожидания до 100, чтобы дать больше места для дыхания.Это замедлит ваше исполнение, конечно.Я не знаю причину, по которой вы пытаетесь это сделать.Возможно, мы могли бы дать вам лучший совет, если бы вы могли поделиться точным вариантом использования.

// emit a range
let largeArray = [...Array(9999).keys()];
let obs = Rx.Observable.from(largeArray, Rx.Scheduler.async);
let sub = obs.subscribe(console.log);

// but cancel after approx 1ms
setTimeout(() => {
  console.log('cancelling');
  sub.unsubscribe()
}, 100);

// ... doesn't cancel
<script src="https://unpkg.com/rxjs@5.5.10/bundles/Rx.min.js"></script>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...