Связка Spring Cloud Stream Rabbitmq - обработка ошибок функции Spring Cloud - PullRequest
3 голосов
/ 27 февраля 2020

Я использую механизм связывания кроликов в весеннем облачном потоке с функцией весеннего облака и определяю таких слушателей, как:

public Function<Flux<SomeObject>, Flux<OtherObject>> foo() {
//some code
}

Я также перенаправляю сообщения о сбоях в DLQ. Проблема в том, что происходит фатальная ошибка типа org.springframework.messaging.converter.MessageConversionException. Он не обрабатывается ConditionalRejectingErrorHandler, как упомянуто в https://docs.spring.io/spring-amqp/reference/html/#exception -обработка , и продолжает работать бесконечно.

Есть ли способ заставить эту работу работать с ConditionalRejectingErrorHandler?

Сейчас я исправляю проблему, используя @ServiceActivator(inputChannel = "errorChannel") и самостоятельно обрабатывая ошибки.

Зависимости:

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
        <dependency>
            <groupId>org.springframework.boot.experimental</groupId>
            <artifactId>spring-boot-starter-data-r2dbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-consul-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-hateoas</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-web</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
</dependencies>

1 Ответ

0 голосов
/ 27 февраля 2020

Мы долго обсуждали обработку ошибок и другие функции, которые мы используем для императивных функций и как они применяются (или даже могут быть применены) к реактивным функциям, и пробовали несколько разных вещей, но, к сожалению, все сводится к несоответствию импеданса.

Подходы, которые вы описываете, основаны на работе с одним сообщением. Это единица работы в обработчиках сообщений императивного стиля, таких как Function<String, String>. Вы используете реактивный стиль и тем самым изменили единицу работы с одного сообщения в потоке на весь поток.

Вкратце:

- Function<?, ?> - unit of work is Message
- Function<Flux<?>, Flux<?>> - unit of work is the entire stream

Вы также можете легко наблюдать это как Реактивная функция вызывается только один раз в течение срока службы приложения, тогда как императив вызывается один раз для каждого поступающего сообщения. Причина, по которой я говорю, состоит в том, что основанные на фреймворке подходы, которые мы используем для обязательных обработчиков сообщений (функций), не могут быть применены к реактивному, не вызывая побочных эффектов. И вообще реактивные разработчики понимают это, особенно учитывая богатство реактивного API, особенно в отношении обработки ошибок

В любом случае мы будем соответствующим образом обновлять документацию.

...