Confluent Kafka Rest Proxy - десериализация Avro - PullRequest
0 голосов
/ 08 марта 2019

Я пытаюсь использовать Confluent Kafka REST Proxy для получения данных в формате Avro из одной из моих тем, но, к сожалению, я получаю ошибку десериализации. Я запрашиваю прокси Kafka REST с помощью следующей команды

 curl -X GET -H "Accept: application/vnd.kafka.avro.v2+json" 
http://localhost:8082/consumers/my-group/instances/my-consumer/records?timeout=30000

И я получаю в ответ

{
  "error_code": 50002,
  "message": "Kafka error: Error deserializing key/value for partition input-0 at offset 0. If needed, please seek past the record to continue consumption."
}

и журналы на прокси-сервере Kafka Rest:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition input-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Данные были получены с использованием KafkaAvroSerializer, и схема присутствует в реестре схем. Также обратите внимание, что данные читаются с помощью avro-console-consumer на CLI.

Кто-нибудь знает, как решить эту проблему?

1 Ответ

0 голосов
/ 08 марта 2019

Скорее всего, наряду с действительными сообщениями Avro по теме, у вас также есть недействительные. Вот что означает эта ошибка, и это именно та ошибка, которую я получил, когда попытался локально получить сообщение, отличное от Avro, с помощью REST Proxy:

ERROR Unexpected exception in consumer read task id=io.confluent.kafkarest.v2.KafkaConsumerReadTask@2e20d4f3  (io.confluent.kafkarest.v2.KafkaConsumerReadTask)
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition avrotest-0 at offset 2. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Я бы использовал такой инструмент, как kafkacat, для проверки фактических сообщений со смещением, указанным в ошибке, например:

kafkacat -C -b localhost:9092 -t test_topic_avro -o 0 -c 1

-o 0 будет использовать сообщение со смещением 0, а -c 1 означает, что будет использовано только одно сообщение.

Вы также можете искать за проблемным смещением , например, для темы avrotest переместите смещение на 1:

echo '{ "offsets": [ { "topic": "avrotest", "partition": 0, "offset": 1 } ] }' | \
http POST localhost:8082/consumers/rmoff_consumer_group/instances/rmoff_consumer_instance/positions \
Content-Type:application/vnd.kafka.v2+json
...