Я пытаюсь понять, как применить противодавление в Spring WebFlux. Я понимаю теорию противодавления, но не могу воспроизвести ее, поэтому не до конца ее понимаю.
Давайте рассмотрим следующий пример:
public void test() throws InterruptedException {
EmitterProcessor<String> processor = EmitterProcessor.create();
new Thread(() -> {
int i = 0;
while(runThread) {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
processor.onNext("Value: " + i);
i++;
}
processor.onComplete();
}).start();
processor
.subscribe(makeSubscriber("FIRST - "), Throwable::printStackTrace);
}
private Consumer<String> makeSubscriber(String label) {
return v -> {
System.out.println(label + v);
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
};
}
Я создал Hot Поток в виде EmitterProcessor и в отдельном потоке я начинаю производить данные для него. Чуть ниже я подписываюсь на это. Подписчик медленнее, чем скорость, с которой производятся элементы, поэтому проблемы должны начаться, верно? Но абонентский лог c запускается в потоке производителя. Когда я вызываю processor.onNext (), он синхронно вызывает всех подписчиков, поэтому, если подписчики работают медленно, издатель также замедляется. Таким образом, обратное давление даже не кажется полезным.
Я также попытался создать два приложения Spring Boot WebFlux, одно с конечной точкой Flux и одно, использующее конечную точку, поэтому я могу быть уверен, что потребитель работает на отдельная тема. Но любая попытка противодавления со стороны потребителя ничего не дает. Нет заполненного буфера, ничего не сбрасывается и ничего!
Кто-нибудь может привести конкретный пример противодавления? Желательно в Spring WebFlux, но я возьму любую реактивную библиотеку Java.