Как подписчик может контролировать издателя с противодействующим противодавлением? - PullRequest
0 голосов
/ 16 марта 2020

У меня есть издатель, который может опубликовать sh быстрее, чем подписчик может обработать данные. Чтобы справиться с этим, я начал работать с противодавлением. Поскольку я не хочу отбрасывать какие-либо данные, я использую противодействующее противодавление. Я понял это как возможность подписчика сообщать издателю, когда публиковать sh больше данных, как описано в этом и следующих параграфах .

Издатель - это Flowable, который выполняет свою работу асинхронно параллельно и впоследствии объединяется в последовательный Flowable. Данные должны буферизоваться до 10 элементов, и когда этот буфер заполнен, Flowable не должен публиковать sh больше данных и ждать следующего запроса.

Подписчик является DisposableSubscriber, который запрашивает 10 элементов при запуске. Каждый потребляемый предмет требует некоторых вычислений, и после этого будет запрошен новый предмет.

Мой MWE выглядит следующим образом:

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 200; i++) {
    src.add(i);
}
Flowable.fromIterable(src)
        .parallel(10, 1)
        .runOn(Schedulers.from(Executors.newFixedThreadPool(10)))
        .flatMap(i -> Single.fromCallable(() -> {
            System.out.println("publisher: " + i);
            Thread.sleep(200);
            return i;
        }).toFlowable())
        .sequential(1)
        .onBackpressureBuffer(10)
        .observeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.newThread())
        .doOnError(Throwable::printStackTrace)
        .subscribeWith(new DisposableSubscriber<Integer>() {
            @Override
            protected void onStart() {
                request(10);
            }
            @Override
            public void onNext(Integer integer) {
                System.out.println("subscriber: " + integer);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                request(1);
            }
            @Override
            public void onError(Throwable t) {
            }
            @Override
            public void onComplete() {
            }
        });
try {
    Thread.sleep(1000000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

Я ожидаю, что этот код сделает следующее: подписчик запрашивает первые 10 элементов. Издатель публикует первые 10 статей. Затем подписчик выполняет вычисления в onNext и запрашивает больше элементов, которые издатель опубликует sh.

Что на самом деле происходит: сначала издатель, кажется, безгранично публикует sh элементов. В какой-то момент, например, после 14 опубликованных элементов, подписчик обрабатывает свой первый элемент. Пока это происходит, издатель продолжает публиковать sh элементов. После примерно 30 опубликованных материалов выдается io.reactivex.exceptions.MissingBackpressureException: Buffer is full и поток заканчивается.

Мой вопрос : что я делаю не так? Как я могу позволить подписчику контролировать, если и когда издатель публикует данные? Очевидно, я делаю что-то ужасно неправильно. В противном случае ожидание не будет таким отличным от реальности.

Пример вывода вышеуказанного MWE:

publisher: 5
publisher: 7
publisher: 8
publisher: 0
publisher: 2
publisher: 6
publisher: 9
publisher: 3
publisher: 4
publisher: 1
publisher: 18
publisher: 17
publisher: 15
subscriber: 0
publisher: 11
publisher: 10
publisher: 19
publisher: 13
publisher: 14
publisher: 12
publisher: 16
publisher: 27
publisher: 28
publisher: 23
publisher: 21
publisher: 29
publisher: 20
publisher: 25
publisher: 22
publisher: 26
io.reactivex.exceptions.MissingBackpressureException: Buffer is full

1 Ответ

1 голос
/ 16 марта 2020

Не эксперт в Rx, но позвольте мне попробовать. observeOn(...) имеет свой собственный размер буфера по умолчанию, равный 128. Таким образом, с самого начала он будет запрашивать у восходящего потока больше, чем может вместить ваш буфер.

observeOn(...) принимает необязательное переопределение размера буфера, но даже если вы его предоставите, ParallelFlowable будет вызывать ваш метод flatMap(...) чаще, чем вы хотите. Я не уверен на 100%, почему, может быть, он имеет свою собственную внутреннюю буферизацию, которую он выполняет при объединении рельсов в последовательную.

Я думаю, что вы можете приблизиться к желаемому поведению, используя flatMap(...) вместо parralel(...), предоставляя аргумент maxConcurrency.

Еще одна вещь, о которой следует помнить, это то, что вы не хотите вызывать subscribeOn(...) - это должно повлиять на Flowable в восходящем потоке целиком. Поэтому, если вы уже звоните parallel(...).runOn(...), это не имеет никакого эффекта, или эффект будет неожиданным.

Вооружившись вышеизложенным, я думаю, это приблизит вас к тому, что вы ищете:

    List<Integer> src = new ArrayList<>();
    for (int i = 0; i < 200; i++) {
        src.add(i);
    }
    Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
    Flowable.fromIterable(src)
            .flatMap(
                    i -> Flowable.just( i )
                            .subscribeOn(scheduler) // here subscribeOn(...) affects just this nested Flowable
                            .map( __ -> {
                                System.out.println("publisher: " + i);
                                Thread.sleep(200);
                                return i;
                            } ),
                    10) // max concurrency
            .observeOn(Schedulers.newThread(), false, 10) // override buffer size
            .doOnError(Throwable::printStackTrace)
            .subscribeWith(new DisposableSubscriber<Integer>() {
                @Override
                protected void onStart() {
                    request(10);
                }
                @Override
                public void onNext(Integer integer) {
                    System.out.println("subscriber: " + integer);
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    request(1);
                }
                @Override
                public void onError(Throwable t) {
                }
                @Override
                public void onComplete() {
                }
            });
    try {
        Thread.sleep(1000000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
...