Ошибка десериализации JSON в AVRO в KSQL: пропуск записи из-за ошибки десериализации - PullRequest
0 голосов
/ 10 июля 2019

Я установил платформу слияния на AWS. Мой источник - MySql, и я подключил его к Kafka connect с помощью разъема debezium. Формат данных из источника - JSON. Теперь в KSQL я создал производную тему и преобразовал тему JSON в AVRO, чтобы сделать возможным передачу данных в MYSQL с помощью JDBC-коннектора. Я использовал следующие запросы:

CREATE STREAM json_stream (userId int, auth_id varchar, email varchar) WITH (KAFKA_TOPIC='test', VALUE_FORMAT='JSON');

производная тема:

create TABLE avro_stream WITH (VALUE_FORMAT='AVRO') AS select * from json_stream;

Я пытался использовать JSON-сообщение напрямую для перехода на mysql, но это не удалось, поскольку коннектору нужна схема, поэтому либо JSON со схемой, либо сообщение Avro помогло бы мне утопить данные.

При использовании из темы avro_stream:

 [2019-07-09 13:27:30,239] WARN task [0_3] Skipping record due to
 deserialization error. topic=[avro_stream] partition=[3] offset=[144]
 (org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
 org.apache.kafka.connect.errors.DataException: avro_stream     at
 io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
    at
 io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:44)
    at
 io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:26)
    at
 org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
    at
 org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
    at
 org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
    at
 org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    at
 org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
    at
 org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at
 org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:638)
    at
 org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:936)
    at
 org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
    at
 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
    at
 org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
 Caused by: org.apache.kafka.common.errors.SerializationException:
 Error deserializing Avro message for id -1 Caused by:
 org.apache.kafka.common.errors.SerializationException: Unknown magic
 byte!

Конфигурация моего коннектора debezium:

{
"name": "debezium-connector",
"config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.user": "XXXXX",
    "auto.create.topics.enable": "true",
    "database.server.id": "1",
    "tasks.max": "1",
    "database.history.kafka.bootstrap.servers": "X.X.X.X:9092",,
    "database.history.kafka.topic": "XXXXXXX",
    "transforms": "unwrap",
    "database.server.name": "XX-server",
    "database.port": "3306",
    "include.schema.changes": "true",
    "table.whitelist": "XXXX.XXXX",
    "key.converter.schemas.enable": "false",
    "value.converter.schema.registry.url": "http://localhost:8081",
    "database.hostname": "X.X.X.X",
    "database.password": "xxxxxxx",
    "value.converter.schemas.enable": "false",
    "name": "debezium-connector",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "database.whitelist": "XXXXX",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter"
},
"tasks": [
    {
        "connector": "debezium-connector",
        "task": 0
    }
],
"type": "source"

}

1 Ответ

0 голосов
/ 10 июля 2019

KSQL записывает ключи как STRING, поэтому, пока вы используете Avro для сериализации значений, ключи - нет.Следовательно, ваш рабочий Sink должен быть настроен следующим образом:

"key.converter": "org.apache.kafka.connect.storage.StringConverter"
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "<url to schema registry>",

Если вы сконфигурировали самого своего рабочего на использование Avro, то вы можете переопределить только key.converter для своей конфигурации Connector.

...