Благодаря @marianosimone, правильное решение:
return io.reactivex.Observable.fromIterable(objects).filter(new Predicate<SomeObject>() {
@Override
public boolean test(SomeObject aObject) throws Exception {
return aObject.isSomething();
}
}).first(aDefaultObject);
Flowable
также работает, но не рекомендуется официальными документами :
Когда использовать Observable
- У вас самый длинный поток не более 1000 элементов: то есть у вас так мало элементов со временем, что практически нет шансов для OOME в вашем приложении.
- Вы имеете дело с событиями графического интерфейса, такими как движения мыши или события касания: они редко могут быть подвергнуты обратному давлению разумно и не так часто. Вы можете быть в состоянии обработать частоту элемента 1000 Гц или менее с помощью Observable, но все равно подумайте об использовании выборки / опровержения.
- Ваш поток по сути синхронный, но ваша платформа не поддерживает потоки Java или вы пропускаете функции из него. Использование Observable в целом имеет меньшую нагрузку, чем Flowable. (Вы можете также рассмотреть IxJava, который оптимизирован для потоков Iterable, поддерживающих Java 6+).
Когда использовать Flowable
- Работа с 10 000+ элементов, которые где-то генерируются, и таким образом цепочка может сказать источнику ограничить количество, которое она генерирует.
- Чтение (разбор) файлов с диска по своей сути является блокирующим и основанным на извлечении, что хорошо работает с противодавлением, когда вы контролируете, например, сколько строк вы читаете из этого для указанной суммы запроса).
- Чтение из базы данных через JDBC также блокируется и основано на извлечении и контролируется вами, вызывая ResultSet.next () для каждого последующего запроса.
- Сетевой (потоковый) ввод-вывод, когда либо помогает сеть, либо используемый протокол поддерживает запрос некоторой логической суммы.
- Многие блокирующие и / или извлекаемые источники данных, которые в конечном итоге могут получить неблокирующий реактивный API / драйвер в будущем.