Рассмотрим следующий пользовательский случай:
Iterable<Object> it = getATerabyteOfDataOnDemand();
Flowable.fromIterable(it)
.blockingSubscribe(v -> consumeSlowly(v));
Ожидаемый результат при выполнении предыдущего кода: я должен получить исключение нехватки памяти, поскольку Flowable.blockingSubscribe запрашивает элементы Long.MAX_VALUE, итератор теряет терабайт данных, и подписчик не может поддерживать .
Чтобы решить эту проблему, я добавляю rebatchRequests в мой код:
Iterable<Object> it = getATerabyteOfDataOnDemand();
Flowable.fromIterable(it)
.rebatchRequests(128)
.blockingSubscribe(v -> consumeSlowly(v));
Мой ожидаемый результат при выполнении следующего кода состоит в том, что все работает отлично, поскольку документация для Flowable.rebatchRequests гласит, что:
Этот оператор позволяет запретить нисходящему потоку запускать неограниченный режим с помощью запроса (Long.MAX_VALUE) или компенсировать накладные расходы по отдельным элементам при малых и частых запросах.
На практике я получаю еще одно исключение нехватки памяти. Как мне использовать противодавление, чтобы предотвратить сливаемость итератора с помощью потока?