Rxjs - неожиданное поведение publishReplay + refCount после возврата refCount в 0 - PullRequest
0 голосов
/ 16 декабря 2018

Мой пример использования таков: я подключаюсь к службе с помощью веб-сокета и получаю периодические (но непредсказуемые) данные о работоспособности службы.Приложение может иметь несколько пользователей этого потока данных, поэтому я хочу поделиться им.Новые подписчики должны видеть самые последние данные о здоровье.Я также хочу закрыть веб-сокет, когда подписчиков больше нет.

Мое приложение использовало shareReplay(1) в течение достаточно долгого времени, пока не было обнаружено, что оно пропускает базовое соединение (https://blog.strongbrew.io/share-replay-issue/)., при которомТочка, которую мы изменили на pipe(publishReplay(1), refCount). Оказывается, у этого также есть тонкое, чего я не ожидал:

  1. Подписчик A подключается и соединение через веб-сокет установлено.
  2. Подписчик Bправильно подключается и делится, а также получает самые последние данные.
  3. Отключаются и A, и B. Отключается веб-розетка
  4. Подключается абонент C, но ему нужно только одно значение take(1).Возвращается значение, которое кэшируется publishReplay(1).

На шаге 4 я действительно хотел воссоздать веб-сокет. Кэшированное значение бесполезно. Параметр timewindow publishReplay заманчиво, но не совсем то, что я хочу.

Мне удалось найти решение, используя pipe(multicast(() => new ReplaySubject(1)), refCount()), но я не знаю Rx достаточно хорошо, чтобы понять все последствия этого.

У меня вопрос - какой лучший способ добиться желаемого поведения?

Спасибо!

Пример кода можно увидеть на https://repl.it/@bradb/MinorColdRouter Встроенный код

const { Observable, ReplaySubject } = require('rxjs');
const { tap, multicast, take, publishReplay, refCount } = require('rxjs/operators');

const log = console.log;

function eq(a, b) {
  let result = JSON.stringify(a) == JSON.stringify(b);
  if (!result) {
    log('eq failed', a, b);
  }
  return result;
}

function assert(cond, msg) {
  if (!cond) {
    log('****************************************');
    log('Assert failed: ', msg);
    log('****************************************');
  }
}

function delay(t) {
  return new Promise(resolve => {
    setTimeout(resolve, t);
  });
}

let liveCount = 0;

// emitValue 1 happens at 100ms, 2 at 200ms etc
function testSource() {
  return Observable.create(function(observer) {
    let emitValue = 1;
    liveCount++;
    log('create');
    let interv = setInterval(() => {
      log('next --------> ', emitValue);
      observer.next(emitValue);
      emitValue++;
    }, 100);

    return () => {
      liveCount--;
      log('destroy');
      clearInterval(interv);
    };
  });
}

async function doTest(name, o) {
  log('\nDOTEST: ', name);
  assert(liveCount === 0, 'Start off not live');
  let a_vals = [];
  o.pipe(take(4)).subscribe(x => {
    a_vals.push(x);
  });
  await delay(250);
  assert(liveCount === 1, 'Must be alive');

  let b_vals = [];
  o.pipe(take(2)).subscribe(x => {
    b_vals.push(x);
  });
  assert(liveCount === 1, 'Two subscribers, one source');
  await delay(500);
  assert(liveCount === 0, 'source is destroyed');
  assert(eq(a_vals, [1, 2, 3, 4]), 'a vals match');
  assert(eq(b_vals, [2, 3]), 'b vals match');

  let c_vals = [];
  o.pipe(take(2)).subscribe(x => {
    c_vals.push(x);
  });
  assert(liveCount === 1, 'Must be alive');

  await delay(300);
  assert(liveCount === 0, 'Destroyed');
  assert(eq(c_vals, [1, 2]), 'c_vals match');
}

async function main() {
  await doTest(
    'bad: cached value is stale',
    testSource().pipe(
      publishReplay(1),
      refCount()
    )
  );
  await doTest(
    'good: But why is this different to publish replay?',
    testSource().pipe(
      multicast(() => new ReplaySubject(1)),
      refCount()
    )
  );
  await doTest(
    'bad: But why is this different to the above?',
    testSource().pipe(
      multicast(new ReplaySubject(1)),
      refCount()
    )
  );
}
main();
...