RxJS - получение данных в пакетах времени + макс. - PullRequest
0 голосов
/ 11 ноября 2018

Я играю с RxJS в первый раз и пытаюсь создать простую наблюдаемую запись, которая будет принимать максимальное количество записей и , ожидая промежуток времени, прежде чем пройти данные подписчикам.

Вот что у меня есть:

import EventEmitter from "events";

import { fromEvent } from "rxjs";
import { bufferTime, filter, take } from "rxjs/operators";

const ev = new EventEmitter();
const observer = fromEvent(ev, "log").pipe(
  take(10),
  bufferTime(1000),
  filter(logs => !!logs.length)
);

observer.subscribe(data => {
  console.log(`received: ${data.length}`);
});

for (let i = 0; i < 100; i++) {
  ev.emit("log", { hello: "world" });
}

То, что я ожидал, произойдет:

received: 10 для печати 10x с интервалом в 1 секунду.

Что на самом деле произошло:

received: 10 был напечатан один раз, а затем скрипт завершился.

Почему я думаю это происходит

Я новичок в RxJS, но из того, что я могу узнать, оператор take() выдает статус «завершено» после того, как он принял 10 записей, что предотвращает дальнейшие срабатывания подписок.

Как мне сделать это наблюдаемое «повторяющимся», чтобы оно а) занимало максимум 10, б) гарантировало, что оно работает, самое большее, один раз каждые 1000 мс, и в) повторяется вечно?

Ответы [ 2 ]

0 голосов
/ 12 ноября 2018

После прочтения вашего последнего комментария можно решить, как решить проблему: объединить bufferTime с interval, используя zip.

По сути, идея в том, что вы можете установить ритм уведомлений, используя interval - например, вы установили interval(1000), чтобы иметь Observable, который излучает каждую секунду.

Затем вы можете использовать bufferTime(1000, null, 10), чтобы убедиться, что вы генерируете массив каждую 1 секунду или если ваш буфер достигает 10 элементов, в зависимости от того, что наступит раньше.

Теперь, если вы zip эти 2 Наблюдаемые, вы получаете Наблюдаемое, которое испускается каждую секунду из-за interval(1000), и оно также испускает все, что выходит из bufferTime(1000, 0, 10) в последовательности. Таким образом, если источник Observable испускает более 10 элементов в секунду, первые 10 будут отправлены в первом уведомлении, остальные останутся в буфере в Observable и будут отправлены в последующих уведомлениях порциями по 10 каждую секунду.

Код, вероятно, проясняет ситуацию.

const ev = new EventEmitter();
// I merge to the event stream created by 'ev' a single notification emitted after 5 seconds, 
// just to make the thing a bit more complex
const sourceObs = merge(
  fromEvent(ev, "log"),
  of(1).pipe(delay(5000))
)

function simpleObservable(maxEntries: number, waitTime: number) {
  return zip(
    timer(0, waitTime),
    sourceObs.pipe(
      bufferTime(waitTime, null, maxEntries),
    )
  )
  .pipe(
    filter(logs => !!logs[1].length),
    map(logs => logs[1])
  )
}

const maxEntries = 4;
const waitTime = 1000;
simpleObservable(maxEntries, waitTime)
.subscribe(data => {
  console.log(`received: ${data.length}`);
});

for (let i = 0; i < 19; i++) {
  ev.emit("log", { hello: "world " + i + 'A' });
}
// some other events are fired after 8 seconds, to make the source more complex
setTimeout(() => {
  for (let i = 0; i < 17; i++) {
    ev.emit("log", { hello: "world " + i + 'B' });
  }
}, 8000);

Очевидно, что вы должны учитывать, что, если исходный Observable излучает со скоростью, превышающей вашу способность использовать уведомления, у вас могут возникнуть проблемы с памятью.

0 голосов
/ 11 ноября 2018

Попробуйте вставить простой оператор tap(d => console.log(JSON.stringify(d))), до take(10) и после bufferTime, и вы увидите, что происходит.

По сути, у вас есть цикл, который испускает 100 событий. Вне этого цикла вы создаете Observable, который в 100 раз уведомляет объект, испускаемый вашим циклом событий.

Все это происходит синхронно в вашем примере.

Затем Observable преобразуется в трубу, которая делает следующее:

  • сначала получает только первые 10 уведомлений через оператора take
  • затем говорит "накапливать все уведомления, которые происходят в первом 1 секунды и испускают их в виде массива "- учитывая, что Весь процесс эмиссии происходит синхронно, и вы берете первые 10 из них bufferTime(1000) испустит один массив из 10 элементов через 1 секунду
  • больше ничего не будет генерироваться этим потоком, так как take(10) завершает источник Observable после 10 синхронных уведомлений, поэтому оператор filter бесполезен
  • последнее, в подписке вы получаете только 1 уведомление, то есть единственное, отправленное bufferTime

Сказал, что, вероятно, то, чего вы хотите достичь, может быть достигнуто с помощью кода в этом направлении

const ev = new EventEmitter();

function simpleObservable(maxEntries: number, waitTime: number) {
  return fromEvent(ev, "log").pipe(
    bufferCount(maxEntries),
    filter(logs => !!logs.length),
    mergeMap(logs => of(logs).pipe(delay(waitTime)), 1),
  )
}

simpleObservable(10, 1000)
.subscribe(data => {
  console.log(`received: ${data.length}`);
});

for (let i = 0; i < 100; i++) {
  ev.emit("log", { hello: "world" });
}

Ключевым моментом здесь является то, что сначала вы разбиваете поток событий на массивы по 10 событий каждый, и это делается с помощью оператора bufferCount.

Затем вы вводите асинхронность, используя mergeMap с уровнем параллелизма, равным 1 (что эквивалентно оператору concatMap). Вы в основном преобразуете каждый массив, испускаемый bufferCount, в другой наблюдаемый с помощью функции of и применяете задержку в 1 секунду для каждого из новых наблюдаемых. Затем вы объединяете их так, чтобы они излучались друг за другом с разницей во времени в 1 секунду.

ОБНОВЛЕННЫЙ ОТВЕТ после комментария @Lee Benson

bufferTime может быть ответом на вашу проблему.

bufferTime имеет 3 параметра:

  • bufferTimeSpan : указывает срок службы буфера, т. Е. Observable генерирует буфер и сбрасывает его после каждого интервала определяется bufferTimeSpan
  • bufferCreationInterval : оставить для null для нашей цели
  • maxBufferSize : указывает максимальный размер буфера - если размер достигнут, то Observable испускает буфер и сбрасывает его

Итак, если мы используем первый и последний параметр, мы сможем достичь желаемых целей.

Это тест, который я собрал

const ev = new EventEmitter();
// I merge to the event stream created by 'ev' a single notification emitted after 5 seconds, 
// just to make the thing a bit more complex
const sourceObs = merge(
  fromEvent(ev, "log"),
  of(1).pipe(delay(5000))
)

function simpleObservable(maxEntries: number, waitTime: number) {
  return sourceObs.pipe(
    bufferTime(waitTime, null, maxEntries),
    filter(logs => !!logs.length),
  )
}

simpleObservable(10, 2000)
.subscribe(data => {
  console.log(`received: ${data.length}`);
});

for (let i = 0; i < 19; i++) {
  ev.emit("log", { hello: "world" });
}
// some other events are fired after 6 seconds, to make the source more complex
setTimeout(() => {
  for (let i = 0; i < 17; i++) {
    ev.emit("log", { hello: "world" });
  }
}, 6000);
...