запись данных Debezium Kafka topi c в Hive через HDFS-коннектор не работает - PullRequest
0 голосов
/ 18 марта 2020

Я пытаюсь зафиксировать MySQL изменения данных с помощью разъема Debezium MySQL в Kafka, а затем записать изменения окончательно в Hive on Had oop через разъем HDFS Sink. Трубопровод имеет вид: MySQL -> Кафка -> Улей.

Разъем приемника настроен, как показано на следующем скриншоте.

{
  "name": "hdfs-sink",
  "config": {
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "customers",
    "hdfs.url": "hdfs://192.168.10.15:8020",
    "flush.size": "3",
    "hive.integration": "true",
    "hive.database":"inventory",
    "hive.metastore.uris":"thrift://192.168.10.14:9083",
    "schema.compatibility":"BACKWARD",
    "transforms": "unwrap,key",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.key.field": "id"
  }
}

Кажется, это работает, но запрашивает таблицу Hive, и я вижу измененные данные (обернутый в ключ after отображается в столбце after вместо разделения данных на столбцы исходной таблицы.

Вот результат запроса scrrenshot.

enter image description here

Как вы можете видеть в конфигурации приемника, я уже пытаюсь использовать оператор "io.debezium.transforms.UnwrapFromEnvelope" Дебезиума для разверните сообщение о событии, но, очевидно, оно не работает.

Какие минимальные настройки позволяют мне записывать события изменения БД из Kafka в Hive? Является ли разъем HDFS Sink правильным выбором для этой работы?

Обновление : я проверил это с образцом базы данных «инвентаризации» из базы данных Debezium. Я получил тестовую среду из изображений Debezium, чтобы они были самыми последними. Некоторая информация о версии здесь: debezium 1.0 , kafka 2.0, коннектор приемника confluent-kafka-connect-hdfs: 5.4.1.

обновление 2 : я перешел на использование следующей конфигурации приемника, но все равно не повезло: * 10 28 *

{
  "name": "hdfs-sink",
  "config": {
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "dbserver1.inventory.customers",
    "hdfs.url": "hdfs://172.17.0.8:8020",
    "flush.size": "3",
    "hive.integration": "true",
    "hive.database":"inventory",
    "hive.metastore.uris":"thrift://172.17.0.8:9083",
    "schema.compatibility":"BACKWARD",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.drop.tombstones": "false"
  }
}
...