Я пытаюсь прочитать сообщение из чистой строковой схемы kafka topi c, используя Spring Kafka API, и затем я получаю сообщение об ошибке от Spring kafka API (нет доступной трассировки стека, в противном случае эта работа не будет такой болезненной), как: " Ошибка десериализации ключа / значения для раздела sometopicname-2 со смещением 98. Если необходимо, просмотрите запись, чтобы продолжить потребление.
Kafka Версии:
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<!-- <version>2.4.5.RELEASE</version> -->
Для kafka topi c он размещен на конфлюэнтной платформе с включенной службой реестра схем.
Схема из реестра реестра API: "schema": "\"string\""
Consumer Config:
protected Map<String, Object> initializeCommonConsumerConfig() {
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, appConfig.getConsumerKafkaBrokerAddress());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, getFetchMaxWaitInMs());
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, getMaxPartitionFetchBytes());
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, getMinFetchBytes());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, getMaxPollRecords());
props.put("security.protocol", "PLAINTEXT");
return props;
}
private ConcurrentKafkaListenerContainerFactory<String,String> buildConcurrentKafkaListenerFactory() {
Map<String, Object> properties = initializeCommonConsumerConfig();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, appConfig.getConsumerGroupId());
final ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConcurrency(1);
factory.setBatchListener(false);
factory.setConsumerFactory(
new DefaultKafkaConsumerFactory<>(properties));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
//by default spring kafka is configured to send ack on error, disabling it
factory.getContainerProperties().setAckOnError(false);
factory.setErrorHandler(new ErrorHandler() {
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
logger.error(thrownException.toString());
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
logger.debug(s);
String topicsDashPartition = s.split(" at offset ")[0];
logger.debug("topic - partition is: "+topicsDashPartition);
String partitionS = topicsDashPartition.split("-")[topicsDashPartition.split("-").length-1];
logger.debug("partitionS is "+partitionS);
String topics = topicsDashPartition.substring(0,topicsDashPartition.length()-partitionS.length()-1);
int offset = Integer.valueOf(s.split("offset ")[1]);
logger.debug("offset is: " + offset);
int partition = Integer.valueOf(partitionS);
TopicPartition topicPartition = new TopicPartition(topics, partition);
logger.info("Skipping " + topics + "-" + partition + " offset " + offset);
consumer.seek(topicPartition, offset + 1);
}
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord, Consumer<?,?> consumer) {
logger.error(e.toString());
String s = e.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
logger.debug(s);
String topicsDashPartition = s.split(" at offset ")[0];
logger.debug("topic - partition is: "+topicsDashPartition);
String partitionS = topicsDashPartition.split("-")[topicsDashPartition.split("-").length-1];
logger.debug("partitionS is "+partitionS);
String topics = topicsDashPartition.substring(0,topicsDashPartition.length()-partitionS.length()-1);
int offset = Integer.valueOf(s.split("offset ")[1]);
logger.debug("offset is: " + offset);
int partition = Integer.valueOf(partitionS);
TopicPartition topicPartition = new TopicPartition(topics, partition);
logger.info("Skipping " + topics + "-" + partition + " offset " + offset);
consumer.seek(topicPartition, offset + 1);
}
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> data) {
// TODO Auto-generated method stub
}
});
return factory;
}
@Bean(name = CONTAINERFACTORY)
ConcurrentKafkaListenerContainerFactory<String, String> sampleConcurrentKafkaListenerContainerFactory() {
return buildConcurrentKafkaListenerFactory();
}
@Bean
public KafkaConsumer kafkaConsumer() {
return new KafkaConsumer();
}
И моя функция прослушивателя выглядит следующим образом:
@KafkaListener(id = "${kafka.consumer.groupid}", topics = "${kafka.consumer.topic}", containerFactory = com.rbc.wm.br.config.KafkaConsumerConfig.CONTAINERFACTORY)
public void listen(final Acknowledgment ack, final ConsumerRecord<String, Object> message) {
String rtxml = null;
try {
//TODO internal logic
logger.info("Got message with key {} on offset {}", message.key(),message.offset());
logger.info(message.value().toString());
/*
* Logic to decode retrieve record
* */
ack.acknowledge();
}
catch(Exception ex) {
logger.error("Exception occurred while decoding {}. Incorrect or Corrupted message.", message.key(), ex);
ack.acknowledge();
}
}
Есть ли какая-либо конфигурация, которую я пропустил при настройке этого потребителя?
Или есть способ увидеть всю трассировку стека в исключении, а не только одно сообщение об исключении от Spring Kafka? Я пытаюсь запустить в режим отладки, однако его там еще не было
Спасибо, что прочитали мой вопрос, и, пожалуйста, посоветуйте, если здесь что-то не так.