Я пытаюсь обработать исключение на слушателе
@KafkaListener(id = PropertiesUtil.ID,
topics = "#{'${kafka.consumer.topic}'}",
groupId = "${kafka.consumer.group.id.config}",
containerFactory = "containerFactory",
errorHandler = "errorHandler")
public void receiveEvents(@Payload List<ConsumerRecord<String, String>> recordList,
Acknowledgment acknowledgment) {
try {
log.info("Consuming the batch of size {} from kafka topic {}", consumerRecordList.size(),
consumerRecordList.get(0).topic());
processEvent(consumerRecordList);
incrementOffset(acknowledgment);
} catch (Exception exception) {
throwOrHandleExceptions(exception, recordList, acknowledgment);
.........
}
}
Конфигурация контейнера Kafka:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(this.numberOfConsumers);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setConsumerFactory(getConsumerFactory());
factory.setBatchListener(true);
return factory;
}
}
обработчик ошибок слушателя impl
@Bean
public ConsumerAwareListenerErrorHandler errorHandler() {
return (m, e, c) -> {
MessageHeaders headers = m.getHeaders();
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
for (int i = 0; i < topics.size(); i++) {
int index = i;
offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
(k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
}
...
};
}
когда я пытаюсь запустить то же самое без пакетной обработки, то я могу получить раздел, значения topi c и смещения, но когда я включаю пакетную обработку и пытаюсь проверить ее, тогда я получаю только два значения внутри заголовков, т.е. id и отметка времени и другие значения не установлены. Я что-то здесь упускаю ??