Противодавление в WebFlux
Чтобы понять, как работает Backpressure в текущей реализации инфраструктуры WebFlux, мы должны повторить транспортный уровень, используемый здесь по умолчанию. Как мы помним, нормальная связь между браузером и сервером (связь сервера с сервером обычно одинакова) осуществляется через TCP-соединение. WebFlux также использует этот транспорт для связи между клиентом и сервером.
Затем, чтобы получить значение термина контроль противодавления , мы должны повторить, что означает противодавление с точки зрения спецификации Reactive Streams.
Базовая семантика определяет, как передача элементов потока регулируется обратным давлением.
Итак, из этого утверждения мы можем заключить, что в Reactive Streams обратное давление - это механизм, который регулирует спрос посредством передачи (уведомления) того, сколько элементов может получить получатель; И здесь у нас есть хитрый момент. TCP имеет байтовую абстракцию, а не логическую абстракцию элементов. Говоря о противодавлении, мы обычно хотим контролировать количество логических элементов, отправленных / полученных в / из сети. Несмотря на то, что TCP имеет свой собственный контроль потока (см. Значение здесь и анимацию там ), этот контроль потока по-прежнему для байтов, а не для логических элементов.
В текущей реализации модуля WebFlux противодавление регулируется управлением транспортным потоком, но оно не раскрывает реальную потребность получателя. Чтобы наконец увидеть поток взаимодействия, см. Следующую диаграмму:
Для простоты вышеприведенная диаграмма показывает связь между двумя микроуслугами, где левый отправляет потоки данных, а правый использует этот поток. Следующий нумерованный список дает краткое объяснение этой диаграммы:
- Это инфраструктура WebFlux, которая должным образом заботится о преобразовании логических элементов в байты и обратно и передаче / получении их в / из TCP (сеть).
- Это начало длительной обработки элемента, которая запрашивает следующие элементы после завершения задания.
- Здесь, в то время как бизнес-логика не требует, WebFlux ставит в очередь байты, которые приходят из сети без их подтверждения (бизнес-логика не требуется).
- Из-за особенностей управления потоком TCP Служба A все еще может отправлять данные в сеть.
Как мы можем заметить на диаграмме выше, спрос, представленный получателем, отличается от спроса отправителя (спрос здесь в логических элементах). Это означает, что требование того и другого является изолированным и работает только для взаимодействия WebFlux <-> Business logic (Service) и менее подвержено обратному давлению для взаимодействия Service A <-> Service B.
Все это означает, что регулирование противодавления в WebFlux не так справедливо, как мы ожидаем.
Но я все еще хочу знать, как контролировать противодавление
Если мы все еще хотим иметь несправедливый контроль противодавления в WebFlux, мы можем сделать это при поддержке операторов Project Reactor, таких как limitRate()
. В следующем примере показано, как мы можем использовать этот оператор:
@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {
return tweetService.process(tweetsFlux.limitRate(10))
.then();
}
Как видно из примера, оператор limitRate()
позволяет определить количество элементов для предварительной выборки за один раз. Это означает, что даже если конечный подписчик запрашивает Long.MAX_VALUE
элементы, оператор limitRate
разделяет эту потребность на куски и не позволяет потреблять больше, чем это сразу. То же самое мы можем сделать с процессом отправки элементов:
@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
return tweetService.retreiveAll()
.limitRate(10);
}
Приведенный выше пример показывает, что даже если WebFlux запрашивает более 10 элементов за раз, limitRate()
ограничивает требование до размера предварительной выборки и не позволяет использовать больше указанного количества элементов одновременно.
Другой вариант - реализовать собственный Subscriber
или расширить BaseSubscriber
из Project Reactor. Например, ниже приведен наивный пример того, как мы можем это сделать:
class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {
int consumed;
final int limit = 5;
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(limit);
}
@Override
protected void hookOnNext(T value) {
// do business logic there
consumed++;
if (consumed == limit) {
consumed = 0;
request(limit);
}
}
}
Справедливое противодавление с протоколом RSocket
Чтобы достичь противодействия логических элементов через границы сети, нам необходим соответствующий протокол для этого. К счастью, есть протокол под названием RScoket protocol . RSocket - это протокол уровня приложения, который позволяет передавать реальный спрос через границы сети.
Существует реализация RSocket-Java этого протокола, которая позволяет настроить сервер RSocket. В случае межсерверной связи та же библиотека RSocket-Java также обеспечивает реализацию клиента. Чтобы узнать больше о том, как использовать RSocket-Java, см. Следующие примеры здесь .
Для связи между браузером и сервером существует реализация RSocket-JS , которая позволяет организовать потоковую связь между браузером и сервером через WebSocket.
Известные рамки поверх RSocket
В настоящее время существует несколько платформ, построенных на основе протокола RSocket.
Proteus
Одним из фреймворков является проект Proteus, который предлагает полноценные микросервисы, построенные поверх RSocket. Кроме того, Proteus хорошо интегрирован со средой Spring, поэтому теперь мы можем добиться справедливого контроля противодавления (см. Примеры there )
Дальнейшие чтения