Соединитель JDBC Apache Kafka - исключение SerializationException: неизвестный магический байт - PullRequest
0 голосов
/ 13 марта 2019

Мы пытаемся записать значения из темы в базу данных postgres, используя Confluent JDBC Sink Connector.

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=xxx
tasks.max=1
topics=topic_name
auto.evolve=true
connection.user=confluent_rw
auto.create=true
connection.url=jdbc:postgresql://x.x.x.x:5432/Datawarehouse
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081

Мы можем прочитать значение в консоли, используя:

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic topic_name

Схема существует, и значение корректно десериализуется с помощью kafka-avro-console-consumer, поскольку она не выдает ошибок, а соединитель выдает эти ошибки:

  {
  "name": "datawarehouse_sink",
  "connector": {
    "state": "RUNNING",
    "worker_id": "x.x.x.x:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "FAILED",
      "worker_id": "x.x.x.x:8083",
      "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.DataException: f_machinestate_sink\n\tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:511)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 13 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n"
    }
  ],
  "type": "sink"
}

Окончательная ошибка:

org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

Схема зарегистрирована в реестре схемы.

Проблема связана с файлом конфигурации соединителя?

1 Ответ

1 голос
/ 13 марта 2019

Ошибка org.apache.kafka.common.errors.SerializationException: Unknown magic byte! означает, что сообщение по теме не было допустимым Avro и не могло быть десериализовано. Это может быть несколько причин:

  1. Некоторые сообщения - Avro, а другие - нет. В этом случае вы можете использовать возможности обработки ошибок в Kafka Connect, чтобы игнорировать недействительные сообщения, используя конфигурацию, подобную этой:

    "errors.tolerance": "all",
    "errors.log.enable":true,
    "errors.log.include.messages":true
    
  2. Значение - это Avro, а клавиши - нет. Если это так, используйте соответствующий key.converter.

...