Безопасно ли подписываться на наблюдаемые с помощью асинхронной функции? - PullRequest
0 голосов
/ 23 апреля 2019

У меня есть источник событий, который отправляет события с частотой 50 Гц. Я хотел бы подписаться на этот источник с асинхронным методом. Код выглядит следующим образом:

this.emitter = fromEventPattern(this.addHandler, this.removeHandler, (err, char) => [err, char]);
this.rxSubscription = this.emitter.subscribe(this.handleUpdatedValuesComingFromSensor);

и

       handleUpdatedValuesComingFromSensor = async (arr: any[]): Promise<void> => {
   ...
   await someMethodAsync();
   ...
}

Возможно, я ошибаюсь, но у меня сложилось впечатление, что ожидание там заставляет эмиттер немедленно вызывать onNext (), потому что я вышел из метода.

Это очень сложно отладить с помощью консольных вызовов из-за частоты событий.

Я прав или нет?

Спасибо за вашу помощь.

РЕДАКТИРОВАТЬ 1 :

Я использую таргетинг машинописи ES2015, поэтому для async / await создается конечный автомат.

Если я прав, как я могу убедиться, что звонки не перекрываются? Мне нужно вычислить средние значения, которые я получаю.

Ответы [ 2 ]

0 голосов
/ 23 апреля 2019

, ожидая там, немедленно вызывает вызовы эмитента onNext (), потому что я вышел из метода

Вы правы.Rx игнорирует возвращаемые типы своих функций подписки, поэтому игнорирует обещание, возвращаемое вашей функцией async, когда она достигает своей первой await.Это означает:

  1. Как только в наблюдаемом прибывает другой элемент, Rx снова вызовет вашу функцию подписки.Он игнорировал обещание, которое было возвращено, поэтому он не знает, что старый вызов все еще выполняется.
  2. Исключения из вашей функции async будут игнорироваться, поскольку обещание было проигнорировано.Некоторые библиотеки обещаний имеют глобальное событие «ошибка ненаблюдаемого обещания», которое может это обработать.
0 голосов
/ 23 апреля 2019

Мне не совсем понятно, что вас беспокоит.Это верно, ваш метод будет вызываться один раз для каждого элемента.Он ничего не пропустит и не обрежет ваш метод на полпути, но:

  1. Он не будет ждать окончания одной итерации handleUpdatedValuesComingFromSensor, прежде чем начинать другую (при условии, что handleUpdatedValuesComingFromSensor действительно что-то делаетасинхронный), поэтому вы можете иметь несколько экземпляров handleUpdatedValuesComingFromSensor в полете одновременно.
  2. Аналогично, если у вас несколько подписчиков, то не будет ждать окончания handleUpdatedValuesComingFromSensor перед отправкой события следующемуподписчик.
...