Я наткнулся на некоторую полезную информацию, пытаясь решить проблему, с которой столкнулся выше. Я приведу фрагмент кода, который должен справиться с этим, но перед этим важно знать, что вызывает это.
При создании или использовании сообщения или данных для Apache Kafka нам нужна структура схемы для этого сообщения или данных, в моем случае схема Avro. Если в Kafka создается конфликт сообщения, который конфликтует с этой схемой сообщения, это повлияет на потребление.
Добавьте приведенный ниже код в свой потребительский топи c в методе, где он использует записи - -
не забудьте импортировать следующие пакеты:
import org. apache .kafka.common.TopicPartition;
import org.jsoup.SerializationException;
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = null;
try {
records = consumer.poll(10000);
} catch (SerializationException e) {
String s = e.getMessage().split("Error deserializing key/value
for partition ")[1].split(". If needed, please seek past the record to
continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at") .
[0]);
TopicPartition topicPartition = new TopicPartition(topics,
partition);
//log.info("Skipping " + topic + "-" + partition + " offset "
+ offset);
consumer.seek(topicPartition, offset + 1);
}
for (ConsumerRecord<String, GenericRecord> record : records) {
System.out.printf("value = %s \n", record.value());
}
}
} finally {
consumer.close();
}