При первом подходе нет необходимости использовать DeadLetterPublishingRecoverer
, вы можете использовать любой ConsumerRecordRecoverer
, который хотите; на самом деле средство восстановления по умолчанию просто регистрирует сообщение об ошибке.
/**
* Construct an instance with the default recoverer which simply logs the record after
* the backOff returns STOP for a topic/partition/offset.
* @param backOff the {@link BackOff}.
* @since 2.3
*/
public SeekToCurrentErrorHandler(BackOff backOff) {
this(null, backOff);
}
И в FailedRecordTracker
...
if (recoverer == null) {
this.recoverer = (rec, thr) -> {
...
logger.error(thr, "Backoff "
+ (failedRecord == null
? "none"
: failedRecord.getBackOffExecution())
+ " exhausted for " + ListenerUtils.recordToString(rec));
};
}
было добавлено отсрочку (и ограничение на количество повторных попыток) обработчик ошибок после добавления повтора в адаптер прослушивателя, поэтому он «новее» (и предпочтительнее).
Кроме того, использование повтора в памяти может вызвать проблемы с перебалансировкой, если используются длинные BackOff
s.
Наконец, только SeekToCurrentErrorHandler
может решать проблемы десериализации (через ErrorHandlingDeserializer
).
EDIT
Используйте ErrorHandlingDeserializer
вместе с а SeekToCurrentErrorHandler
. Исключения десериализации считаются фатальными, и средство восстановления вызывается немедленно.
См. документацию .
Вот простое приложение Spring Boot, которое демонстрирует это:
public class So63236346Application {
private static final Logger log = LoggerFactory.getLogger(So63236346Application.class);
public static void main(String[] args) {
SpringApplication.run(So63236346Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so63236346").partitions(1).replicas(1).build();
}
@Bean
ErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler((rec, ex) -> log.error(ListenerUtils.recordToString(rec, true) + "\n"
+ ex.getMessage()));
}
@KafkaListener(id = "so63236346", topics = "so63236346")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so63236346", "{\"field\":\"value1\"}");
template.send("so63236346", "junk");
template.send("so63236346", "{\"field\":\"value2\"}");
};
}
}
package com.example.demo;
public class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getField() {
return this.field;
}
public void setField(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Thing
Результат
Thing [field=value1]
2020-08-10 14:30:14.780 ERROR 78857 --- [o63236346-0-C-1] com.example.demo.So63236346Application : so63236346-0@7
Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[106, 117, 110, 107]] from topic [so63236346]
2020-08-10 14:30:14.782 INFO 78857 --- [o63236346-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so63236346-1, groupId=so63236346] Seeking to offset 8 for partition so63236346-0
Thing [field=value2]