Закрытие соединения Reactor Netty по кодам ошибок - PullRequest
0 голосов
/ 24 марта 2020

Я использую Reactor Netty через среду Spring Webflux для отправки данных в удаленную сеть доставки контента. Когда клиентский запрос завершен, поведение Reactor Netty по умолчанию состоит в том, чтобы поддерживать соединение активным и возвращать его в базовый пул соединений.

Некоторые сети доставки контента рекомендуют повторно разрешать DNS для определенных типов кодов состояния. (например, 500 внутренняя ошибка сервера). Для этого я добавил пользовательские Netty DnsNameResolver и DnsCache, но мне также необходимо закрыть соединение, иначе оно будет возвращено обратно в пул и DNS не будет повторно разрешен.

Как можно go закрыть соединение по кодам ошибок?

До сих пор я придумал следующий обходной путь, добавив ConnectionObserver к TcpClient в Reactor Netty:

TcpClient tcpClient = TcpClient.create()
        .observe((connection, newState) -> {
            if (newState == State.RELEASED && connection instanceof HttpClientResponse) {
                HttpResponseStatus status = ((HttpClientResponse) connection).status();
                if (status.codeClass() != HttpStatusClass.SUCCESS) {
                    connection.dispose();
                }
            }
        });

А именно, если соединение было разорвано (т. Е. Помещено обратно в пул соединений) и оно было вызвано ответом клиента HTTP с неудачным кодом состояния, затем закройте соединение.

Такой подход кажется неуклюжим. Если соединение освобождается после кода ошибки, и наблюдатель закрывает это соединение, может ли новый запрос получить такое же соединение параллельно? Внутренняя структура обрабатывает вещи изящно, или это условие гонки, которое лишает законной силы вышеупомянутый подход?

Заранее спасибо за вашу помощь!

1 Ответ

1 голос
/ 24 марта 2020

Лучше использовать doOnResponse или doAfterResponseSuccess Это зависит от варианта использования, который является более подходящим.

Однако ожидание RELEASED не должно быть проблемой

Если соединение сбрасывается после кода состояния ошибки, и наблюдатель закрывает это соединение, может ли новый запрос получить такое же соединение параллельно? Внутренняя среда обрабатывает вещи изящно или это условие гонки, которое лишает законной силы вышеуказанный подход?

Пул соединений работает по умолчанию с лизинговой стратегией FIFO, поэтому, если в пуле есть свободные соединения, вы будете не получить такое же соединение, что не имеет место, если вы переключаете пул соединений на лизинговую стратегию LIFO. При обнаружении каждое соединение проверяется, является ли оно активным или нет, и для использования будет предоставлено только активное соединение.

ОБНОВЛЕНИЕ :

Вы также можете попробовать подход, который использует ТОЛЬКО API WebClient, а не Reactor Netty API:

return this.webClient
           .get()
           .uri("/500")
           .retrieve()
           .onStatus(status -> status.equals(HttpStatus.INTERNAL_SERVER_ERROR), clientResponse -> {
                clientResponse.bodyToFlux(DataBuffer.class)
                              .subscribe(new BaseSubscriber<DataBuffer>() {
                                  @Override
                                  protected void hookOnSubscribe(Subscription subscription) {
                                      subscription.cancel();
                                  }
                              });
                return Mono.error(new IllegalStateException("..."));
           })
           .bodyToMono(String.class);
...