org.apache.kafka.connect.errors.DataException: недопустимый JSON для значения по умолчанию для массива: "null" - PullRequest
0 голосов
/ 29 октября 2018

Я пытаюсь использовать сливной коннектор Kafka s3 , используя confluent-4.1.1 .

s3 раковина

"value.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"

Когда я запускаю разъемы Kafka для приемника s3, я получаю это сообщение об ошибке:

ERROR WorkerSinkTask{id=singular-s3-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.DataException: Invalid JSON for array default value: "null"
        at io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1649)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1562)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1443)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1443)
        at io.confluent.connect.avro.AvroData.toConnectSchema(AvroData.java:1323)
        at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:1047)
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Моя схема содержит только 1 поле типа массива, и его схема выглядит следующим образом

{"name":"item_id","type":{"type":"array","items":["null","string"]},"default":[]}

Я могу увидеть десериализованное сообщение с помощью команды kafka-avro-console-consumer. Я видел аналогичный вопрос , но в его случае он также использовал сериализатор Avro для ключа.

./confluent-4.1.1/bin/kafka-avro-console-consumer --topic singular_custom_postback --bootstrap-server localhost:9092  -max-messages 2

"item_id":[{"string":"15552"},{"string":"37810"},{"string":"38061"}]
"item_id":[]

Я не могу поместить весь вывод, полученный от потребителя консоли, так как он содержит конфиденциальную информацию о пользователе, поэтому я добавил единственное поле типа массива в мою схему.

Спасибо заранее.

Ответы [ 2 ]

0 голосов
/ 10 апреля 2019

io.confluent.connect.avro.AvroData.defaultValueFromAvro(AvroData.java:1649) вызывается для преобразования авро-схемы прочитанного сообщения во внутреннюю схему приемника подключений. Я считаю, что это не связано с данными вашего сообщения. Вот почему AbstractKafkaAvroDeserializer может успешно десериализовать ваше сообщение (например, через kafka-avro-console-consumer), так как ваше сообщение является действительным авро-сообщением. Вышеуказанное исключение может произойти, если ваше значение по умолчанию null, а null не является допустимым значением вашего поля. Э.Г.

{
   "name":"item_id",
   "type":{
      "type":"array",
      "items":[
         "string"
      ]
   },
   "default": null
}

Я бы предложил вам удаленно отладить соединение и посмотреть, что именно не работает.

0 голосов
/ 30 октября 2018

Та же проблема, что и у вопроса, с которым вы связались.

В исходном коде вы можете увидеть это условие.

  case ARRAY: {
    if (!jsonValue.isArray()) {
      throw new DataException("Invalid JSON for array default value: " + jsonValue.toString());
    }

И исключение может быть вызвано, когда тип схемы определен в вашем случае как type:"array", но сама полезная нагрузка имеет значение null (или любой другой тип значения), а не фактически массив, несмотря на то, что у вас есть определяется как значение схемы по умолчанию. Значение по умолчанию применяется только тогда, когда элемент items вообще отсутствует, а не тогда, когда "items":null


Кроме этого, я хотел бы предложить такую ​​схему, т. Е. Объект записи, а не просто именованный массив, с пустым массивом по умолчанию, а не null.

{
  "type" : "record",
  "name" : "Items",
  "namespace" : "com.example.avro",
  "fields" : [ {
    "name" : "item_id",
    "type" : {
      "type" : "array",
      "items" : [ "null", "string" ]
    },
    "default": []
  } ]
}
...