Spring Cloud Stream Kafka "errorChannel" заголовок - PullRequest
1 голос
/ 28 июня 2019

В настоящее время мы используем Spring Cloud Stream для предоставления SubscribableChannel для ввода и MessageChannel для вывода, и мы используем Spring Integration для обеспечения бизнес-логики между двумя темами Kafka. В других приложениях Spring Integration мы использовали заголовок errorChannel для динамического предоставления канала обработки ошибок (например, изменение канала ошибок при возникновении определенного изменения, которое необходимо будет откатить при ошибке).

Однако входной канал, предоставленный Spring Cloud Stream, не соответствует заголовку errorChannel. Есть ли способ включить это?

Пример кода (пожалуйста, извините за опечатки - рука скопирована с другого устройства)

public interface TestStreams {
     @Input
    SubscribableChannel input();

    @Output
    MessageChannel output();
}
@SpringBootApplication
@EnableBinding(TestSTreams.class)
public class TestService {
    public static void main(String[] args) {
        SpringApplication.run(TestProcessorMicroservice.class, args);
    }

    @Bean
    public IntegrationFlow buildProcessingFlow(TestStreams streams, TransformerThatThrowsException transformer) {
        return IntegrationFlows.from(streams.input())
                .enrichHeaders(ImmutableMap.of(MessageHeaders.ERROR_CHANNEL, "errorChannel")
                .transform(transformer)
                .channel(streams.output())
                .get();
    }

    @Bean
    public IntegrationFlow buildErrorFlow() {
        return IntegrationFlows.from("errorChannel")
            .handle(new MyErrorHandler())
            .get();
    }
}

Приведенный выше код работает правильно, но когда преобразователь выдает исключение, ошибка обрабатывается org.springframework.kafka.listener.LoggingErrorHandler вместо того, чтобы направляться в errorChannel и в конечном итоге обрабатывается MyErrorHandler.

1 Ответ

0 голосов
/ 28 июня 2019

Это не сработает, потому что в KafkaListenerContainer с упомянутым LoggingErrorHandler по умолчанию выбрасывается исключение.errorChannel просто не имеет никакого значения в этом контексте.Поскольку инициатор потока Spring Integration отсутствует.

Если вы хотите обрабатывать .transform(transformer) ошибок, вы можете рассмотреть возможность использования ExpressionEvaluatingRequestHandlerAdvice с его failureChannel функциональностью.

См.его JavaDocs и Справочное руководство для получения дополнительной информации.

Чтобы добавить такой Advice в поток, вам нужно добавить еще один лямбда-аргумент для этого transform().Например:

.transform(transformer, e -> e.advice(myExpressionEvaluatingRequestHandlerAdvice())
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...