Как создать Flux, включающий исторические и будущие предметы? - PullRequest
1 голос
/ 04 марта 2020

У меня есть приложение, которое получает события (используя Spring Application Listeners), и я хочу создать поток некоторых исторических событий (которые могут быть прочитаны из постоянства) и бесконечный поток входящих событий. Цель состоит в том, чтобы предоставить метод publi c, который возвращает поток всех событий, которые когда-либо наблюдались, т. Е. Конкатенации начальных исторических событий, всех наблюдаемых событий и всех будущих событий. Предполагается, что следующий сервис предоставляет API в терминах метода stream ().

@Service
public class EventService implements ApplicationListener<Event>{

 private Flux<Integer> eventStream = getHistoricalEvents();

 private Flux<Integer> getHistoricalEvents() {
    return Flux.just(1,2,3,4,5);
 }

 @Override
 public void onApplicationEvent(Event event) {
    eventStream = eventStream.concatWithValues(event.hashCode());
 }

 public Flux<Integer> stream(){
    return eventStream;
 }
}

Это работает для достижения первых двух целей, то есть клиенты получают начальный набор исторических событий и все события, которые До сих пор наблюдалось.

Как я могу добавить будущие события? В идеале я ищу решение, которое достигает всего этого идиоматически, поскольку я новичок в реактивном программировании.

1 Ответ

0 голосов
/ 04 марта 2020

Вы уже научились concat с существующими постоянными историческими событиями. Что касается всех будущих событий, это звучит очень похоже на то, что вам нужно Flux.cache():

Превратите этот поток в горячий источник и кешируйте последние испущенные сигналы для дальнейшего подписчика. Сохранит неограниченную громкость onNext сигналов. Завершение и ошибка также будут воспроизведены.

...