Как настроить обратное давление в Spring WebFlux? - PullRequest
0 голосов
/ 14 февраля 2020

Я пытаюсь понять, как применить противодавление в Spring WebFlux. Я понимаю теорию противодавления, но не могу воспроизвести ее, поэтому не до конца ее понимаю.

Давайте рассмотрим следующий пример:

public void test() throws InterruptedException {
    EmitterProcessor<String> processor = EmitterProcessor.create();

    new Thread(() -> {
        int i = 0;
        while(runThread) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException ignored) {
            }
            processor.onNext("Value: " + i);
            i++;
        }
        processor.onComplete();
    }).start();

    processor
            .subscribe(makeSubscriber("FIRST - "), Throwable::printStackTrace);
}

private Consumer<String> makeSubscriber(String label) {
    return v -> {
        System.out.println(label + v);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ignored) {
        }
    };
}

Я создал Hot Поток в виде EmitterProcessor и в отдельном потоке я начинаю производить данные для него. Чуть ниже я подписываюсь на это. Подписчик медленнее, чем скорость, с которой производятся элементы, поэтому проблемы должны начаться, верно? Но абонентский лог c запускается в потоке производителя. Когда я вызываю processor.onNext (), он синхронно вызывает всех подписчиков, поэтому, если подписчики работают медленно, издатель также замедляется. Таким образом, обратное давление даже не кажется полезным.

Я также попытался создать два приложения Spring Boot WebFlux, одно с конечной точкой Flux и одно, использующее конечную точку, поэтому я могу быть уверен, что потребитель работает на отдельная тема. Но любая попытка противодавления со стороны потребителя ничего не дает. Нет заполненного буфера, ничего не сбрасывается и ничего!

Кто-нибудь может привести конкретный пример противодавления? Желательно в Spring WebFlux, но я возьму любую реактивную библиотеку Java.

1 Ответ

0 голосов
/ 14 февраля 2020

документация к выбранному вами варианту метода subscribe гласит:

Подписка будет запрашивать неограниченный спрос (Long.MAX_VALUE).

, что Вы отключили противодавление самостоятельно.

Чтобы использовать противодавление, подпишитесь на Flux.subscribe (Subscriber)

...