Я пытаюсь зафиксировать MySQL изменения данных с помощью разъема Debezium MySQL в Kafka, а затем записать изменения окончательно в Hive on Had oop через разъем HDFS Sink. Трубопровод имеет вид: MySQL -> Кафка -> Улей.
Разъем приемника настроен, как показано на следующем скриншоте.
{
"name": "hdfs-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "customers",
"hdfs.url": "hdfs://192.168.10.15:8020",
"flush.size": "3",
"hive.integration": "true",
"hive.database":"inventory",
"hive.metastore.uris":"thrift://192.168.10.14:9083",
"schema.compatibility":"BACKWARD",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id"
}
}
Кажется, это работает, но запрашивает таблицу Hive, и я вижу измененные данные (обернутый в ключ after
отображается в столбце after
вместо разделения данных на столбцы исходной таблицы.
Вот результат запроса scrrenshot.
Как вы можете видеть в конфигурации приемника, я уже пытаюсь использовать оператор "io.debezium.transforms.UnwrapFromEnvelope"
Дебезиума для разверните сообщение о событии, но, очевидно, оно не работает.
Какие минимальные настройки позволяют мне записывать события изменения БД из Kafka в Hive? Является ли разъем HDFS Sink правильным выбором для этой работы?
Обновление : я проверил это с образцом базы данных «инвентаризации» из базы данных Debezium. Я получил тестовую среду из изображений Debezium, чтобы они были самыми последними. Некоторая информация о версии здесь: debezium 1.0 , kafka 2.0, коннектор приемника confluent-kafka-connect-hdfs: 5.4.1.
обновление 2 : я перешел на использование следующей конфигурации приемника, но все равно не повезло: * 10 28 *
{
"name": "hdfs-sink",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "dbserver1.inventory.customers",
"hdfs.url": "hdfs://172.17.0.8:8020",
"flush.size": "3",
"hive.integration": "true",
"hive.database":"inventory",
"hive.metastore.uris":"thrift://172.17.0.8:9083",
"schema.compatibility":"BACKWARD",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false"
}
}