org.apache.kafka.connect.errors.DataException: недопустимый JSON для значения записи по умолчанию: null - PullRequest
0 голосов
/ 27 июня 2018

У меня есть тема Kafka Avro, созданная с помощью KafkaAvroSerializer.
Мои автономные свойства, как показано ниже.
Я использую Confluent 4.0.0 для запуска Kafka connect.

key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=<schema_registry_hostname>:8081
value.converter.schema.registry.url=<schema_registry_hostname>:8081
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

Когда я запускаю соединители Kafka для приемника hdfs в автономном режиме, я получаю это сообщение об ошибке:

[2018-06-27 17:47:41,746] ERROR WorkerSinkTask{id=camus-email-service-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: Invalid JSON for record default value: null
    at io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1640)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1527)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1410)
    at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1290)
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1014)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:88)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:454)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2018-06-27 17:47:41,748] ERROR WorkerSinkTask{id=camus-email-service-0} Task is being killed and will not recover until manually restarted (    org.apache.kafka.connect.runtime.WorkerTask)
[2018-06-27 17:52:19,554] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect).

Когда я использую kafka-avro-console-consumer, передавая реестр схемы, я получаю десериализованные сообщения Kafka.

т.е:.

/usr/bin/kafka-avro-console-consumer --bootstrap-server <kafka-host>:9092 --topic <KafkaTopicName> --property schema.registry.url=<schema_registry_hostname>:8081

Ответы [ 2 ]

0 голосов
/ 09 июля 2018

Изменение типа данных столбца «подписка» на тип данных Union устранило проблему. Авроконвертеры смогли десериализовать сообщения.

0 голосов
/ 28 июня 2018

Я думаю, твой ключ Кафки нулевой, а не Avro.

Или это какой-то другой тип, но уродливый и не преобразованный в тип данных RECORD. См. Исходный код AvroData

case RECORD: {
    if (!jsonValue.isObject()) {
      throw new DataException("Invalid JSON for record default value: " + jsonValue.toString());
    }

ОБНОВЛЕНИЕ Согласно вашему комментарию, вы можете увидеть, что это правда

$ curl -X GET localhost:8081/subjects/<kafka-topic>-key/versions/latest    
{"subject":"<kafka-topic>-key","version":2,"id":625,"schema":"\"bytes\""}

В любом случае HDFS Connect не хранит ключ по умолчанию, поэтому старайтесь не десериализовать ключ вообще, а не использовать Avro.

key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Кроме того, ваш потребитель консоли не печатает ключ, поэтому ваш тест не подходит. Вам нужно добавить --property print.key=true

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...