Как заставить газ rxjs работать правильно? - PullRequest
0 голосов
/ 25 мая 2019

У меня есть поток данных, который никогда не останавливается. Алгоритм должен быть запущен для отдельных элементов потока данных. Алгоритм не может идти в ногу с потоком данных, поэтому элементы данных, поступающие во время обработки алгоритма, должны быть отброшены. Я пытаюсь использовать оператор rxjs throttle, чтобы выполнить это, но дроссель не сбрасывается, когда я излучаю из моего Subject, который я использую как durationSelector . Однако дроссель работает, когда я заменяю Subject на оператор interval. Я предполагаю, что упускаю что-то простое, но не вижу этого.

import { Subject, interval, range } from 'rxjs';
import { throttle } from 'rxjs/operators';

function algorithm() {
  // simulate algorithm taking 1000ms
  return new Promise(resolve => setTimeout(resolve, 1000));
}
const dataStream = interval(200);
const algorithmDone = new Subject();
const pipeline = dataStream.pipe(throttle(sv => algorithmDone));
pipeline.subscribe(x => {
  console.log('here ' + x);
  algorithm();
  algorithmDone.next(0);
});

Код запуска здесь: https://stackblitz.com/edit/rxjs-tbs8qz

Выше есть поток данных, генерирующий данные каждые 200 мс. Алгоритм занимает 1000 мс, поэтому необходимо печатать примерно каждый пятый элемент из dataStream. Я получаю только первый номер с вышеуказанной реализацией. Если я заменим algorithmDone в throttle() на interval(1000), я получу ожидаемый результат. Почему это не работает с моим Subject?

1 Ответ

2 голосов
/ 25 мая 2019

Функция algorithm() возвращает обещание, но вы используете его, как если бы оно было синхронным.

Изменение:

pipeline.subscribe(x => {
  console.log('here ' + x);
  algorithm();
  algorithmDone.next(0);
});

Для любого:

pipeline.subscribe(x => {
  console.log('here ' + x);
  algorithm().then(() => algorithmDone.next(0));
});

или:

pipeline.subscribe(async x => {
  console.log('here ' + x);
  await algorithm();
  algorithmDone.next(0);
});
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...