Я установил платформу слияния на AWS. Мой источник - MySql, и я подключил его к Kafka connect с помощью разъема debezium. Формат данных из источника - JSON. Теперь в KSQL я создал производную тему и преобразовал тему JSON в AVRO, чтобы сделать возможным передачу данных в MYSQL с помощью JDBC-коннектора. Я использовал следующие запросы:
CREATE STREAM json_stream (userId int, auth_id varchar, email varchar) WITH (KAFKA_TOPIC='test', VALUE_FORMAT='JSON');
производная тема:
create TABLE avro_stream WITH (VALUE_FORMAT='AVRO') AS select * from json_stream;
Я пытался использовать JSON-сообщение напрямую для перехода на mysql, но это не удалось, поскольку коннектору нужна схема, поэтому либо JSON со схемой, либо сообщение Avro помогло бы мне утопить данные.
При использовании из темы avro_stream:
[2019-07-09 13:27:30,239] WARN task [0_3] Skipping record due to
deserialization error. topic=[avro_stream] partition=[3] offset=[144]
(org.apache.kafka.streams.processor.internals.RecordDeserializer:86)
org.apache.kafka.connect.errors.DataException: avro_stream at
io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:97)
at
io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:44)
at
io.confluent.ksql.serde.connect.KsqlConnectDeserializer.deserialize(KsqlConnectDeserializer.java:26)
at
org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:65)
at
org.apache.kafka.common.serialization.ExtendedDeserializer$Wrapper.deserialize(ExtendedDeserializer.java:55)
at
org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63)
at
org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
at
org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97)
at
org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at
org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:638)
at
org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:936)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.common.errors.SerializationException:
Error deserializing Avro message for id -1 Caused by:
org.apache.kafka.common.errors.SerializationException: Unknown magic
byte!
Конфигурация моего коннектора debezium:
{
"name": "debezium-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.user": "XXXXX",
"auto.create.topics.enable": "true",
"database.server.id": "1",
"tasks.max": "1",
"database.history.kafka.bootstrap.servers": "X.X.X.X:9092",,
"database.history.kafka.topic": "XXXXXXX",
"transforms": "unwrap",
"database.server.name": "XX-server",
"database.port": "3306",
"include.schema.changes": "true",
"table.whitelist": "XXXX.XXXX",
"key.converter.schemas.enable": "false",
"value.converter.schema.registry.url": "http://localhost:8081",
"database.hostname": "X.X.X.X",
"database.password": "xxxxxxx",
"value.converter.schemas.enable": "false",
"name": "debezium-connector",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"database.whitelist": "XXXXX",
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
},
"tasks": [
{
"connector": "debezium-connector",
"task": 0
}
],
"type": "source"
}