Версии Java SDK Couchbase до 3.x (которые еще не вышли на момент написания этой статьи) используют RxJava версии 1.
Вызовы flatmap
, как они у вас сейчас, будут публиковать операцииво внутренний буфер для асинхронного выполнения, возвращая Observable
для отслеживания каждого из них.Это означает, что первый flatmap
будет использовать вывод вашего from
вызова неограниченным образом.Другими словами, он будет читать весь список намного быстрее, чем операции.Я ожидаю, что ошибка OOM, которую вы видите, происходит из-за переполнения внутреннего буфера Couchbase.
Чтобы исправить это, вы можете использовать вариант flatmap
, который ограничивает количество ожидающих подписок.Вы просто добавляете второй целочисленный параметр к вашему вызову flatmap
.Таким образом, у вас будет .flatmap(new Func1<~>..., 10)
, чтобы ограничить себя 10 невыполненными операциями за раз.
Стандартный буфер в Couchbase составляет около 16000 невыполненных операций, но это гораздо больше, чем нужно для насыщения большинства систем.
Для справки см. Связанную статью Пост переполнения стека об ограничении пропускной способности для загрузки файлов.