Механизм противодавления в Spring Web-Flux - PullRequest
0 голосов
/ 09 сентября 2018

Я начинающий в Spring Web-Flux . Я написал контроллер следующим образом:

@RestController
public class FirstController 
{
    @GetMapping("/first")
    public Mono<String> getAllTweets() 
    {
        return Mono.just("I am First Mono")
    }
}

Я знаю, что одно из реактивных преимуществ - Противодавление , и оно может сбалансировать запрос или частоту ответов. Я хочу понять, как использовать механизм противодавления в Spring Web-Flux .

1 Ответ

0 голосов
/ 09 сентября 2018

Противодавление в WebFlux

Чтобы понять, как работает Backpressure в текущей реализации инфраструктуры WebFlux, мы должны повторить транспортный уровень, используемый здесь по умолчанию. Как мы помним, нормальная связь между браузером и сервером (связь сервера с сервером обычно одинакова) осуществляется через TCP-соединение. WebFlux также использует этот транспорт для связи между клиентом и сервером. Затем, чтобы получить значение термина контроль противодавления , мы должны повторить, что означает противодавление с точки зрения спецификации Reactive Streams.

Базовая семантика определяет, как передача элементов потока регулируется обратным давлением.

Итак, из этого утверждения мы можем заключить, что в Reactive Streams обратное давление - это механизм, который регулирует спрос посредством передачи (уведомления) того, сколько элементов может получить получатель; И здесь у нас есть хитрый момент. TCP имеет байтовую абстракцию, а не логическую абстракцию элементов. Говоря о противодавлении, мы обычно хотим контролировать количество логических элементов, отправленных / полученных в / из сети. Несмотря на то, что TCP имеет свой собственный контроль потока (см. Значение здесь и анимацию там ), этот контроль потока по-прежнему для байтов, а не для логических элементов.

В текущей реализации модуля WebFlux противодавление регулируется управлением транспортным потоком, но оно не раскрывает реальную потребность получателя. Чтобы наконец увидеть поток взаимодействия, см. Следующую диаграмму:

enter image description here

Для простоты вышеприведенная диаграмма показывает связь между двумя микроуслугами, где левый отправляет потоки данных, а правый использует этот поток. Следующий нумерованный список дает краткое объяснение этой диаграммы:

  1. Это инфраструктура WebFlux, которая должным образом заботится о преобразовании логических элементов в байты и обратно и передаче / получении их в / из TCP (сеть).
  2. Это начало длительной обработки элемента, которая запрашивает следующие элементы после завершения задания.
  3. Здесь, в то время как бизнес-логика не требует, WebFlux ставит в очередь байты, которые приходят из сети без их подтверждения (бизнес-логика не требуется).
  4. Из-за особенностей управления потоком TCP Служба A все еще может отправлять данные в сеть.

Как мы можем заметить на диаграмме выше, спрос, представленный получателем, отличается от спроса отправителя (спрос здесь в логических элементах). Это означает, что требование того и другого является изолированным и работает только для взаимодействия WebFlux <-> Business logic (Service) и менее подвержено обратному давлению для взаимодействия Service A <-> Service B.

Все это означает, что регулирование противодавления в WebFlux не так справедливо, как мы ожидаем.

Но я все еще хочу знать, как контролировать противодавление

Если мы все еще хотим иметь несправедливый контроль противодавления в WebFlux, мы можем сделать это при поддержке операторов Project Reactor, таких как limitRate(). В следующем примере показано, как мы можем использовать этот оператор:

@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {

    return tweetService.process(tweetsFlux.limitRate(10))
                       .then();
}

Как видно из примера, оператор limitRate() позволяет определить количество элементов для предварительной выборки за один раз. Это означает, что даже если конечный подписчик запрашивает Long.MAX_VALUE элементы, оператор limitRate разделяет эту потребность на куски и не позволяет потреблять больше, чем это сразу. То же самое мы можем сделать с процессом отправки элементов:

@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {

    return tweetService.retreiveAll()
                       .limitRate(10);
}

Приведенный выше пример показывает, что даже если WebFlux запрашивает более 10 элементов за раз, limitRate() ограничивает требование до размера предварительной выборки и не позволяет использовать больше указанного количества элементов одновременно.

Другой вариант - реализовать собственный Subscriber или расширить BaseSubscriber из Project Reactor. Например, ниже приведен наивный пример того, как мы можем это сделать:

class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {

    int consumed;
    final int limit = 5;

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        request(limit);
    }

    @Override
    protected void hookOnNext(T value) {
        // do business logic there 

        consumed++;

        if (consumed == limit) {
            consumed = 0;

            request(limit);
        }
    }
}

Справедливое противодавление с протоколом RSocket

Чтобы достичь противодействия логических элементов через границы сети, нам необходим соответствующий протокол для этого. К счастью, есть протокол под названием RScoket protocol . RSocket - это протокол уровня приложения, который позволяет передавать реальный спрос через границы сети. Существует реализация RSocket-Java этого протокола, которая позволяет настроить сервер RSocket. В случае межсерверной связи та же библиотека RSocket-Java также обеспечивает реализацию клиента. Чтобы узнать больше о том, как использовать RSocket-Java, см. Следующие примеры здесь . Для связи между браузером и сервером существует реализация RSocket-JS , которая позволяет организовать потоковую связь между браузером и сервером через WebSocket.

Известные рамки поверх RSocket

В настоящее время существует несколько платформ, построенных на основе протокола RSocket.

Proteus

Одним из фреймворков является проект Proteus, который предлагает полноценные микросервисы, построенные поверх RSocket. Кроме того, Proteus хорошо интегрирован со средой Spring, поэтому теперь мы можем добиться справедливого контроля противодавления (см. Примеры there )

Дальнейшие чтения

...