Rx-Java2 Floawable.rebatchRequests не перезапускает запросы - PullRequest
0 голосов
/ 30 апреля 2018

Рассмотрим следующий пользовательский случай:

Iterable<Object> it = getATerabyteOfDataOnDemand();
Flowable.fromIterable(it)
    .blockingSubscribe(v -> consumeSlowly(v));

Ожидаемый результат при выполнении предыдущего кода: я должен получить исключение нехватки памяти, поскольку Flowable.blockingSubscribe запрашивает элементы Long.MAX_VALUE, итератор теряет терабайт данных, и подписчик не может поддерживать .


Чтобы решить эту проблему, я добавляю rebatchRequests в мой код:

Iterable<Object> it = getATerabyteOfDataOnDemand();
Flowable.fromIterable(it)
    .rebatchRequests(128)
    .blockingSubscribe(v -> consumeSlowly(v));

Мой ожидаемый результат при выполнении следующего кода состоит в том, что все работает отлично, поскольку документация для Flowable.rebatchRequests гласит, что:

Этот оператор позволяет запретить нисходящему потоку запускать неограниченный режим с помощью запроса (Long.MAX_VALUE) или компенсировать накладные расходы по отдельным элементам при малых и частых запросах.

На практике я получаю еще одно исключение нехватки памяти. Как мне использовать противодавление, чтобы предотвратить сливаемость итератора с помощью потока?

1 Ответ

0 голосов
/ 30 апреля 2018

Проблема в том, что у вас есть синхронный поток и blockingSubscribe ставит в очередь элементы, которые затем не могут быть использованы из-за fromIterable, все еще излучающего в том же потоке. rebatchRequest здесь не помогает, потому что его потребитель все еще является неограниченным blockingSubscribe, поэтому он всегда берет предметы и потребляет fromIterable полностью.

Либо вам не нужно blockingSubscribe, либо вы должны рассмотреть возможность использования blockingIterable() с for-each, который запрашивает больше только тогда, когда текущий поток фактически потребляет элементы. В противном случае вам придется использовать перегрузку blockingSubscribe(Subscriber) и запросить вручную:

source.blockingSubscribe(new Subscriber<T>() {
    Subscription upstream;
    @Override public void onSubscribe(Subscription s) {
        upstream = s;
        s.request(1);
    }

    @Override public void onNext(T item) {
        consumeSlowly(v);
        upstream.request(1);
    }

    @Override public void onError(Throwable ex) {
        ex.printStackTrace();
    }

    @Override public void onComplete() {
    }
});

Обратите внимание, что у вас уже есть Iterable, который вы можете использовать синхронно с for-each без участия RxJava.

...