Rx Java: начало сердцебиения после последнего выброса, пока не прибудет следующий - PullRequest
0 голосов
/ 04 мая 2020

Я работаю над потоком Http SSE, и мне нужно поддерживать http соединение. Для этого я отправляю пустое сообщение (с именем heartbeat, все остальные называются реальными событиями) каждые 15 секунд.

source
.map(EventSource.Event::event)
.mergeWith(interval(0, 15, TimeUnit.SECONDS).map(t -> event("").withName("heartbeat"))) 

Недостатком этого решения является то, что я отправляю событие heartbeat, если последнее реальное событие отправляется в <15 СЕКУНД. </p>

Я хочу начать отправку событий сердцебиения после того, как последнее полученное сообщение устарело на 15 СЕКОНДОВ назад, и прекратить отправку этих событий при возврате реальных событий.

Примерно так:

--- xxx ------ x ---------- x -------- ------- ч --------------- ч --------------- ч - х ------- -------- hx ---------- x --------------- h ----

  • x - реальные события
  • h - события сердцебиения
  • - равны 1 ВТОРОЙ

Любая помощь приветствуется:)

1 Ответ

0 голосов
/ 04 мая 2020

Я думаю, switchMap оператор - это то, что вы ищете:

source
    .map(EventSource.Event::event)
    .startWith(event("").withName("heartbeat"))
    .switchMap (event -> interval(15, TimeUnit.SECONDS)
        .map(t -> event("").withName("heartbeat"))
        .startWith(event)
    )

РЕДАКТИРОВАТЬ

Как вы упомянули в комментарии, вам нужно запустить hearBeat, даже если еще не было никаких событий. Для этого вам нужно добавить класс Init в ваш EventSource.Event и использовать оператор filter(), чтобы игнорировать его позже. См. Отредактированный код.

EDIT 2

убрал initialDelay и запустил его с событием HeartBeat

...