Буферизация RxJava - игнорирование нулевых элементов - PullRequest
0 голосов
/ 20 января 2019

Вот код, который у меня есть для буферизации и преобразования входящих событий:

public Publisher<Collection<EventTO>> logs(String eventId) {
    ConnectableObservable<Event> connectableObservable = eventsObservable
        .share().publish();
    connectableObservable.connect();

    connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
        .filter(event -> event.getId().equals(eventId))
        .buffer(1, TimeUnit.SECONDS, 50)
        .map(eventsMapper::mapCollection);
}

Проблема в том, что Flowable возвращает пустой список каждую секунду, хотя в eventsObservable.

не публикуется никаких событий.

Есть ли способ удерживать .buffer, пока не найдется хотя бы один объект?

Примечание: Похоже, есть способ сделать это в C # (описано здесь: https://stackoverflow.com/a/30090185/668148). Но как это сделать в Java?

1 Ответ

0 голосов
/ 23 января 2019

Как предположил Марк Кин, .distinctUntilChanged добивается цели.

Таким образом, следующий код будет выдвигать список событий, если после буферизации есть 1+ элементов:

connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
    .filter(event -> event.getId().equals(eventId))
    .buffer(1, TimeUnit.SECONDS, 50)
    .distinctUntilChanged()             // <<<======  
    .map(eventsMapper::mapCollection);
...