В настоящее время мы используем Spring Cloud Stream для предоставления SubscribableChannel
для ввода и MessageChannel
для вывода, и мы используем Spring Integration для обеспечения бизнес-логики между двумя темами Kafka. В других приложениях Spring Integration мы использовали заголовок errorChannel
для динамического предоставления канала обработки ошибок (например, изменение канала ошибок при возникновении определенного изменения, которое необходимо будет откатить при ошибке).
Однако входной канал, предоставленный Spring Cloud Stream, не соответствует заголовку errorChannel
. Есть ли способ включить это?
Пример кода (пожалуйста, извините за опечатки - рука скопирована с другого устройства)
public interface TestStreams {
@Input
SubscribableChannel input();
@Output
MessageChannel output();
}
@SpringBootApplication
@EnableBinding(TestSTreams.class)
public class TestService {
public static void main(String[] args) {
SpringApplication.run(TestProcessorMicroservice.class, args);
}
@Bean
public IntegrationFlow buildProcessingFlow(TestStreams streams, TransformerThatThrowsException transformer) {
return IntegrationFlows.from(streams.input())
.enrichHeaders(ImmutableMap.of(MessageHeaders.ERROR_CHANNEL, "errorChannel")
.transform(transformer)
.channel(streams.output())
.get();
}
@Bean
public IntegrationFlow buildErrorFlow() {
return IntegrationFlows.from("errorChannel")
.handle(new MyErrorHandler())
.get();
}
}
Приведенный выше код работает правильно, но когда преобразователь выдает исключение, ошибка обрабатывается org.springframework.kafka.listener.LoggingErrorHandler
вместо того, чтобы направляться в errorChannel
и в конечном итоге обрабатывается MyErrorHandler
.