SerializationException: Ошибка десериализации сообщения Avro - PullRequest
0 голосов
/ 21 января 2020

Я получил ошибку при создании Kafka JdbcSinkConnector (моя задача - перенести данные из таблицы Kafka topi c в Postgres):

Причина: org. apache .kafka. common.errors.SerializationException: Ошибка десериализации сообщения Avro для идентификатора -1

Что означает идентификатор -1?

Настройки для соединителя:

{
  "name": "MVM Test",
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "errors.log.enable": "true",
  "errors.log.include.messages": "true",
  "topics": [
    "mvm_test_events"
  ],
  "connection.url": "jdbc:connection",
  "connection.user": "user",
  "connection.password": "*************"
}

Также Я описал схему для (значение) topi c "mvm_test_events" в Центре управления:

{
  "type": "record",
  "name": "event",
  "namespace": "mvm",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "series_storage",
      "type": "int"
    },
    {
      "name": "type",
      "type": "int"
    },
    {
      "name": "entity_id",
      "type": "int"
    },
    {
      "name": "processing_ts",
      "type": "double"
    },
    {
      "name": "from_ts",
      "type": "double"
    },
    {
      "name": "to_ts",
      "type": "string"
    },
    {
      "name": "context",
      "type": {
        "type": "record",
        "name": "context",
        "fields": [
          {
            "name": "trainName",
            "type": "string"
          }
        ]
      }
    }
  ]
}

Журналы ошибок:

> [2020-01-22 06:45:10,380] ERROR Error encountered in task
> mvm-test-events-0. Executing stage 'VALUE_CONVERTER' with class
> 'io.confluent.connect.avro.AvroConverter', where consumed record is
> {topic='mvm_test_events', partition=0, offset=14,
> timestamp=1579615711794, timestampType=CreateTime}.
> (org.apache.kafka.connect.runtime.errors.LogReporter)
> org.apache.kafka.connect.errors.DataException: Failed to deserialize
> data for topic mvm_test_events to Avro:   at
> io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
>   at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>   at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>   at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id -1 Caused by:
> org.apache.kafka.common.errors.SerializationException: Unknown magic
> byte! [2020-01-22 06:45:10,381] ERROR
> WorkerSinkTask{id=mvm-test-events-0} Task threw an uncaught and
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded
> in error handler  at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
>   at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>   at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>   at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.kafka.connect.errors.DataException: Failed to deserialize
> data for topic mvm_test_events to Avro:   at
> io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>   ... 13 more Caused by:
> org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id -1 Caused by:
> org.apache.kafka.common.errors.SerializationException: Unknown magic
> byte! [2020-01-22 06:45:10,381] ERROR
> WorkerSinkTask{id=mvm-test-events-0} Task is being killed and will not
> recover until manually restarted
> (org.apache.kafka.connect.runtime.WorkerTask)

Насколько я вижу, он пытается преобразовать запись в топи c с io.confluent.connect.avro.AvroConverter. Теперь я должен определить имя схемы (я описал в настройках topi c) в настройках соединителя "Класс преобразователя значений"?

1 Ответ

1 голос
/ 21 января 2020

Вы получаете сообщение об ошибке

org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

при использовании AvroConverter в Kafka Connect для чтения данных, которые не были сериализованы как Avro .

либо нужно исправить своего производителя (и правильно сериализовать данные как Avro), либо, если вы не хотели использовать Avro, то исправьте конфигурацию коннектора Kafka Connect, чтобы использовать соответствующий конвертер.

Для получения дополнительной информации см. Эту статью .

Редактировать : Исходя из вашего обновленного вопроса, вы собираетесь писать как Avro, поэтому используйте Авроконвертер правильный. Вы не включили его в конфигурацию вашего коннектора, поэтому я предполагаю, что он уже установлен в ваших рабочих свойствах Kafka Connect ("value.converter": "io.confluent.connect.avro.AvroConverter"). Каким-то образом у вас есть данные на вашем topi c, который не Avro. Я предлагаю вам настроить очередь недоставленных сообщений , которая будет направлять эти сообщения в новую топи c для проверки, в то же время позволяя вашему приемнику продолжить обработку сообщений, которые являются Avro.

...