Ничто не мешает вам настроить оба обработчика ошибок ...
@SpringBootApplication
public class So55001718Application {
public static void main(String[] args) {
SpringApplication.run(So55001718Application.class, args);
}
@KafkaListener(id = "so55001718", topics = "so55001718", errorHandler = "listenerEH")
public void listen(String in) {
System.out.println(in);
if ("bad1".equals(in)) {
throw new IllegalStateException();
}
else if("bad2".equals(in)) {
throw new IllegalArgumentException();
}
}
@Bean
public KafkaListenerErrorHandler listenerEH() {
return (m, t) -> {
if (t.getCause() instanceof IllegalStateException) {
System.out.println(
t.getClass().getSimpleName() + " bad record " + m.getPayload() + " handled by listener EH");
return null;
}
else {
throw (t);
}
};
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.setErrorHandler((t, r) -> {
System.out.println(t.getClass().getSimpleName() + " bad record " + r.value() + " handled by container EH");
});
return factory;
}
@Bean
public NewTopic topic() {
return new NewTopic("so55001718", 1, (short) 1);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so55001718", "good");
template.send("so55001718", "bad1");
template.send("so55001718", "bad2");
};
}
}
и
good
bad1
ListenerExecutionFailedException bad record bad1 handled by listener EH
bad2
ListenerExecutionFailedException bad record bad2 handled by container EH
Вы можете создать простую оболочку для переноса нескольких обработчиков ошибок; не стесняйтесь, чтобы открыть GitHub выпуск (материалы приветствуются).