Опрос с RxJS, который может восстановить пропущенные события - PullRequest
1 голос
/ 09 июня 2019

Я пытаюсь использовать RxJS для опроса событий. Тем не менее, у меня есть доступ только к одной функции, которая getEvent(). Я могу сделать 2 вещи с помощью функции getEvent:

  1. getEvent("latest") - это даст мне последний объект события
  2. getEvent(eventId) - Я передаю целое число, и оно выдаст мне объект события, соответствующий eventId.

Идентификаторы событий всегда увеличиваются от 0, но проблема в том, что если мой интервал опроса не достаточно мал, я могу пропустить события.

Например, если я делаю getEvent("latest") и получаю событие с идентификатором 1, это здорово. Но если в следующий раз я позвоню, я получу ID 3, я знаю, что пропустил событие.

В этом случае я хочу использовать наблюдаемый более высокий порядок для вызова getEvent(2) и getEvent(3), чтобы потребителю создаваемого мною потока не приходилось беспокоиться о пропущенном событии.

Прямо сейчас, все, что у меня есть, примерно так:

timer(0, 500).pipe(
  concatMap(() => from(getEvent("latest"))
)

Для некоторого контекста я работаю над этим постом: https://itnext.io/polling-using-rxjs-b56cd3531815

1 Ответ

2 голосов
/ 09 июня 2019

Использование expand для рекурсивного вызова GET идеально подходит здесь.Вот пример с DEMO :

const source = timer(0, 2000)

const _stream = new Subject();
const stream = _stream.asObservable();

const s1 = source.pipe(tap(random)).subscribe()    

const sub = stream.pipe(
  startWith(0),
  pairwise(),  
  concatMap((v: Array<number>) => {
    let missing = v[1] - v[0];
    return missing ? getMissing(v[0], missing) : EMPTY
  })
).subscribe(console.log)

function getMissing(start, count) {
  return getById(start).pipe(
    expand(id => getById(id+1)),
    take(count)
  )  
}

// helper functions for DEMO

let i = 1;
function random() {. // THIS IS YOUR getEvent('latest')
  if (i < 10) {
    i+=2;
    _stream.next(i
      // (Math.floor(Math.random() * 8))
    )
  }
}

function getById(id) {.  // THIS IS YOUR getEvent(eventId)
  return of(id).pipe(delay(1000)) // delay to mimic network
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...