Поток от повторяемого, как справиться с обратным давлением - PullRequest
0 голосов
/ 21 декабря 2018

Мне нужно справиться с обратным давлением в моем потоке, который получает в качестве входных данных список объектов.Размер списка варьируется от нескольких сотен до нескольких сотен тысяч элементов.Фактический код:

Flux.fromIterable(alarms)
            .limitRate(parallelism)
            .parallel(parallelism)
            .runOn(Schedulers.elastic(), bufferSize)
            .doOnNext(reactiveHandleDataService::handleAlarm)
;

Параметр limitRange просто заставляет отказаться от списка определенного размера, что мне не нужно.Мне нужно, чтобы все полученные данные были переданы на реактивHandleDataService, я не могу потерять сообщение.

Как я могу справиться с обратным давлением в этом случае?Я не нашел много примеров, хорошо объясняющих проблему, особенно с использованием итерируемого источника.

Я использую Californium-SR3 для выпуска реактора, и это часть приложения с пружинной загрузкой.

1 Ответ

0 голосов
/ 21 декабря 2018

это задание handleAlarm, которое имеет обратное давление, если не может поддерживать много данных, чтобы отправить запрос на новые данные после обработки 1. Если у вас нет обратного давления для handleAlarm, вы можете добавить задержку

Flux.fromIterable(alarms)
        .limitRate(parallelism)
        .delayElements(Duration.ofMillis(10))
        .doOnNext(reactiveHandleDataService::handleAlarm)
        .subscribeOn(Schedulars.elastic)

;

и если вам нужно противодавление, почему вы работаете параллельно

...