Обработка ошибок с исходящим шлюзом WebFlux в Spring Integration - PullRequest
0 голосов
/ 10 ноября 2019

Я пытаюсь понять, как следует обрабатывать ошибки в исходящем шлюзе в весенней интеграции с webflux.

в весенней интеграции без webflux int-http: outbound-gateway имеет обработчик ошибок , как показано ниже:

<int-http:outbound-gateway
        http-method="GET"
        url-expression="url"
        expected-response-type="java.lang.String"
        error-handler="accessErrorHandler"
        header-mapper="headerMapper"
        />

, но при весенней интеграции с webflux int-webflux: исходящий-шлюз не имеет обработчика ошибок

<int-webflux:outbound-gateway
                http-method="GET"
                url-expression="url"
                expected-response-type="java.lang.String"
                header-mapper="headerMapper"
        />

это мои зависимости от pom.xml:

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-webflux</artifactId>
            <version>5.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

1 Ответ

2 голосов
/ 10 ноября 2019

Модуль Spring Integration HTTP полностью основан на RestTemplate от Spring Web. У этого есть упомянутый ErrorHandler для его синхронных запросов.

Модуль Spring Integration WebFlux полностью основан на неблокирующей WebClient от основания Spring WebFlux. Внутренняя логика основана на типах Project Reactor, таких как Flux и Mono. Чтобы соответствовать спецификации реактивных потоков, WebFluxRequestExecutingMessageHandler просто возвращает Mono для ответа. В случае каких-либо ошибок при взаимодействии с сервером у нас это есть:

requestSpec.exchange()
                    .flatMap(response -> {
                        HttpStatus httpStatus = response.statusCode();
                        if (httpStatus.isError()) {
                            return response.body(BodyExtractors.toDataBuffers())
                                    .reduce(DataBuffer::write)
                                    .map(dataBuffer -> {
                                        byte[] bytes = new byte[dataBuffer.readableByteCount()];
                                        dataBuffer.read(bytes);
                                        DataBufferUtils.release(dataBuffer);
                                        return bytes;
                                    })
                                    .defaultIfEmpty(new byte[0])
                                    .map(bodyBytes -> {
                                                throw new WebClientResponseException(
                                                        "ClientResponse has erroneous status code: "
                                                                + httpStatus.value() + " "
                                                                + httpStatus.getReasonPhrase(),
                                                        httpStatus.value(),
                                                        httpStatus.getReasonPhrase(),
                                                        response.headers().asHttpHeaders(),
                                                        bodyBytes,
                                                        response.headers().contentType()
                                                                .map(MimeType::getCharset)
                                                                .orElse(StandardCharsets.ISO_8859_1));
                                            }
                                    );
                        }
                        else {
                            return Mono.just(response);
                        }
                    });

Итак, некоторые WebClientResponseException будут брошены в ответ Mono. В любом реактивном или нереактивном нисходящем потоке такое исключение будет обрабатываться следующим образом:

protected void sendErrorMessage(Message<?> requestMessage, Throwable ex) {
    Object errorChannel = resolveErrorChannel(requestMessage.getHeaders());
    Throwable result = ex;
    if (!(ex instanceof MessagingException)) {
        result = new MessageHandlingException(requestMessage, ex);
    }
    if (errorChannel == null) {
        logger.error("Async exception received and no 'errorChannel' header exists and no default "
                + "'errorChannel' found", result);
    }
    else {
        try {
            sendOutput(new ErrorMessage(result), errorChannel, true);
        }
        catch (Exception e) {
            Exception exceptionToLog =
                    IntegrationUtils.wrapInHandlingExceptionIfNecessary(requestMessage,
                            () -> "failed to send error message in the [" + this + ']', e);
            logger.error("Failed to send async reply", exceptionToLog);
        }
    }
}

Где это errorChannel извлекается из заголовков сообщения запроса и возвращается к глобальному IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME.

Имея все это в руках, вы должны подписаться на такой канал ошибок для обработки этих WebClientResponseException экземпляров соответственно. См. Больше информации о RestTemplate в документации Spring Framework: https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#rest-client-access

...