Скорее всего, наряду с действительными сообщениями Avro по теме, у вас также есть недействительные. Вот что означает эта ошибка, и это именно та ошибка, которую я получил, когда попытался локально получить сообщение, отличное от Avro, с помощью REST Proxy:
ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@2e20d4f3 (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition avrotest-0 at offset 2. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Я бы использовал такой инструмент, как kafkacat
, для проверки фактических сообщений со смещением, указанным в ошибке, например:
kafkacat -C -b localhost:9092 -t test_topic_avro -o 0 -c 1
-o 0
будет использовать сообщение со смещением 0, а -c 1
означает, что будет использовано только одно сообщение.
Вы также можете искать за проблемным смещением , например, для темы avrotest
переместите смещение на 1
:
echo '{ "offsets": [ { "topic": "avrotest", "partition": 0, "offset": 1 } ] }' | \
http POST localhost:8082/consumers/rmoff_consumer_group/instances/rmoff_consumer_instance/positions \
Content-Type:application/vnd.kafka.v2+json