Не удалось десериализовать данные для topi c в разъем авро-приемника - PullRequest
0 голосов
/ 30 марта 2020

Я использую confluent и подключаю rdbms к kafka, и это работает. некоторые ETL с K SQL работает на. Но когда я хочу вернуть свой поток / таблицу обратно в rdbms, теперь возникает проблема.

проверка моей темы / потока, если есть avro:

./bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --property schema.registry.url=http://localhost:8081 --from-beginning --max-messages 1 --topic STR_VAHG_REKEY_02 | jq '.'

результат:

{"REKEY02": {"long": 3941641584970777000}}

, кажется, работает нормально, но когда я его опускаю:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\
tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)\n\
tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)\n\
tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:484)\n\
tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)\n\
tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)\n\
tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\
tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\
tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\
tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\
tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\
tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\
tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\
tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\
tat java.lang.Thread.run(Thread.java:748)\n
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic STR_VAHG_REKEY_02 to Avro: \n\
tat io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:110)\n\
tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:484)\n\
tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)\n\
tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)\n\t... 
13 more\n
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1\n
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n"

вот мой файл-приемник:

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=STR_VAHG_REKEY_02
connection.url=jdbc:mysql://
connection.user=
connection.password=
auto.create=true
timestamp.column.name=create_at
validate.non.null=false
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

я уже потерян, любое предложение было бы неплохо

1 Ответ

0 голосов
/ 30 марта 2020

Ваш вывод avro-console-consumer правильно показывает, что значение равно Avro, но ksqlDB записывает сообщение keys в виде строки, а не Avro.

Заменить эти две строки

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081

на

key.converter=org.apache.kafka.connect.storage.StringConverter
...