Наблюдаемый: пропустить (количество подписчиков) - PullRequest
0 голосов
/ 22 мая 2019

У меня есть Observable, который испускается, когда вызывается обратный вызов внешнего API. Я хотел бы пропустить (n) выбросы, где n - количество подписчиков, подписавшихся на наблюдаемое.

Например: подписчик, подписавшийся на 2-й, должен получить только второе излучение, а затем отказаться от подписки.

Оператор пропуска не работает, так как количество подписок может измениться.

https://stackblitz.com/edit/rxjs-qdnh9f

let toSkip = 0;
const source = () => {
  return Observable.create((observer) => {
    toSkip++;
    // External API callback
    const handler = (count) => () => {
      observer.next(count++);
    };
    const interval = setInterval(handler(1), 1000)
    const unsubscribe = () => {
      toSkip--;
      console.log('clear interval');
      clearInterval(interval)
    }
    observer.add(unsubscribe);
  }).pipe(
    skip(toSkip), 
    take(1)
  );
}


const subscription1 = source().subscribe(x => console.log('subscription1', x));
const subscription2 = source().subscribe(x => console.log('subscription2', x));
// subscription3 should emit "2" as subscription2 will unsubscribe never run
const subscription3 = source().subscribe(x => console.log('subscription3', x));

setTimeout(() => {
  subscription2.unsubscribe();
}, 500);

Подписка3 должна выдавать «2», так как подписка2 будет отписываться перед вызовом.

Ожидаемый вывод на консоль:

clear interval
subscription1 1
clear interval
subscription3 2
clear interval

Ответы [ 2 ]

0 голосов
/ 22 мая 2019

Пропуск имеет статический аргумент, но в этой ситуации мы должны использовать динамически изменяемую переменную, поэтому необходимо изменить оператор.

Также мы не можем назвать функцию внутри создания наблюдателя как unsubscribe, потому что она вызывается после завершения.Мы не можем отследить, сколько раз мы отписались, поэтому мы можем вернуть упакованный метод, который сделает это за нас.

https://stackblitz.com/edit/rxjs-bpfcgm - проверьте несколько случаев

import { Observable } from 'rxjs'; 
import { map, take, filter } from 'rxjs/operators';


const getSource = function() {

  let inc = 0;
  let unsubscribed = 0;

  const source = () => {
    inc++;

    let created = inc;
    let handlerCount = 0;

    return Observable.create((observer) => {

      // External API callback
      const handler = (count) => () => {
        handlerCount++;
        observer.next(count++); // Emit any value here
      };
      const interval = setInterval(handler(1), 1000)
      const complete = () => {
        console.log('clear interval');
        clearInterval(interval)
      }
      return complete;
    }).pipe(
      filter(() => handlerCount >= created - unsubscribed), 
      take(1)
    );
  }

  const unsubscribe = o => {
    unsubscribed++;
    o.unsubscribe();
  }

  return [source, unsubscribe];
}

let [source, unsubscribe] = getSource();

const subscription1 = source().subscribe(x => console.log('subscription1', x));
const subscription2 = source().subscribe(x => console.log('subscription2', x));
// subscription3 should emit "2" as subscription2 will unsubscribe never run
const subscription3 = source().subscribe(x => console.log('subscription3', x));

setTimeout(() => {
  unsubscribe(subscription2);
}, 500)

0 голосов
/ 22 мая 2019

Пропустить работает, Ваша первая подписка1 пропускает 1 значение и принимает 1 (0 пропущено 1 получено) subscription3 пропускает значение 3 (0,1,2) и принимает 1 (то есть 3). Почему должно быть 2?

.pipe(
skip(toSkip), 
take(1)

Этот раздел выполняется один раз, когда создается источник Observable, и начальное значение больше не изменяется. И не имеет значения, что toSkip был уменьшен последним, источник 3 был инициирован со значением skip 3.

Также следует помнить, что каждая новая подписка для одного и того же наблюдателя выполняет этот код

    toSkip++;
    // External API callback
    const handler = (count) => () => {
      observer.next(count++);
    };
    const interval = setInterval(handler(1), 1000)
    const unsubscribe = () => {
      toSkip--;
      console.log('clear interval');
      clearInterval(interval)
    }
    observer.add(unsubscribe);

Это означает, что ToSkip будет увеличиваться для каждой новой подписки. например, этот код также увеличивает ToSkip на 2 единицы.

var source = source();
const subscription1 = source.subscribe(x => console.log('subscription1', x));
const subscription1_1 = source.subscribe(x => console.log('subscription1_1', x));

Также take (1) автоматически завершает сбор и отменяет подписку на всех подписчиков, что также вызывает ваше событие отмены подписки. Вы можете использовать фильтр вместо пропуска из-за его динамической природы, но использование переменных с состоянием данных в наблюдаемой коллекции является плохой практикой. это не корпоративное решение:

import { Observable } from 'rxjs'; 
import { map, skip, take, filter } from 'rxjs/operators';

let toSkip = 0;
const source = () => {
  let init;
  return Observable.create((observer) => {
    toSkip++;
    init = toSkip;
    // External API callback
    const handler = (count) => () => {

      observer.next(count++);
      console.log('count ' + count);
    };
    const interval = setInterval(handler(1), 1000)
    const unsubscribe = () => {    

      console.log(' clear interval ' + toSkip);
      clearInterval(interval)
    }
    observer.add(unsubscribe);
    console.log('skip ' + toSkip);
  }).pipe(
    filter((x) =>
    {
      console.log(x + ' - ' + toSkip);
       return x == init || x == toSkip
       }),
       take(1)
       );
}

const subscription1 = source().subscribe(x => {
   console.log('subscription1', x);   
   });

const subscription2 = source().subscribe(x => { 
  console.log('subscription2', x);

});
// subscription3 should emit "2" as subscription2 will unsubscribe never run
const subscription3 = source().subscribe(x => {
   console.log('subscription3', x)

});

setTimeout(() => {
   toSkip--;
  subscription2.unsubscribe();
}, 500)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...