Вот код, который у меня есть для буферизации и преобразования входящих событий:
public Publisher<Collection<EventTO>> logs(String eventId) {
ConnectableObservable<Event> connectableObservable = eventsObservable
.share().publish();
connectableObservable.connect();
connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
.filter(event -> event.getId().equals(eventId))
.buffer(1, TimeUnit.SECONDS, 50)
.map(eventsMapper::mapCollection);
}
Проблема в том, что Flowable
возвращает пустой список каждую секунду, хотя в eventsObservable
.
не публикуется никаких событий.
Есть ли способ удерживать .buffer
, пока не найдется хотя бы один объект?
Примечание:
Похоже, есть способ сделать это в C # (описано здесь: https://stackoverflow.com/a/30090185/668148).
Но как это сделать в Java?