Преобразование Debezium ExtractNewRecordState не может работать - PullRequest
1 голос
/ 09 мая 2020

Я создаю синхронизатор данных, который фиксирует изменение данных из MySQL Source и экспортирует данные в куст.

Я решил использовать Kafka Connect для реализации этого. Я использую Debezium в качестве соединителя источника, а сливающиеся hdfs в качестве соединителя приемника.

Debezium обеспечивает преобразование одиночного сообщения , чтобы я мог извлечь поле after из сложного сообщения о событии. Я выполняю ту же конфигурацию, что и в документе, перечисленном в списке, но это не сработало.

{
    // omit ...
    "transform": "unwrap",
    "transform.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}

Я попытался настроить преобразование как на стороне соединителя источника, так и на стороне соединителя приемника, это все еще не работает. Фактически, когда я настраиваю его на стороне исходного коннектора, а затем проверяю сообщение в соответствующем топе c, я обнаружил, что сообщения по-прежнему содержат все поля, включая before, source, et c.

ythh@openstack2:~/confluent-5.5.0$ bin/kafka-avro-console-consumer --from-beginning --bootstrap-server localhost:9092 --topic dbserver1.test_data_1.student3
{"before":null,"after":{"dbserver1.test_data_1.student3.Value":{"id":1,"name":"ggg"}},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589005572000,"snapshot":{"string":"false"},"db":"test_data_1","table":{"string":"student3"},"server_id":1,"gtid":null,"file":"mysql-bin.000011","pos":9474,"row":0,"thread":{"long":6013},"query":null},"op":"c","ts_ms":{"long":1589005572172},"transaction":null}
{"before":null,"after":{"dbserver1.test_data_1.student3.Value":{"id":2,"name":"no way"}},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589005893000,"snapshot":{"string":"false"},"db":"test_data_1","table":{"string":"student3"},"server_id":1,"gtid":null,"file":"mysql-bin.000011","pos":11218,"row":0,"thread":{"long":6030},"query":null},"op":"c","ts_ms":{"long":1589005893773},"transaction":null}
{"before":null,"after":{"dbserver1.test_data_1.student3.Value":{"id":3,"name":"not work"}},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589005900000,"snapshot":{"string":"false"},"db":"test_data_1","table":{"string":"student3"},"server_id":1,"gtid":null,"file":"mysql-bin.000011","pos":11501,"row":0,"thread":{"long":6030},"query":null},"op":"c","ts_ms":{"long":1589005900724},"transaction":null}

Я также проверил журнал подключения kafka, вот некоторые из результатов:

ythh@openstack2:~/kafka_2.12-2.5.0/logs$ cat connect.log | grep transform
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
[2020-05-09 14:29:30,470] INFO    transform.unwrap.type = io.debezium.transforms.ExtractNewRecordState (io.debezium.connector.common.BaseSourceTask:97)
[2020-05-09 14:29:30,470] INFO    transform = unwrap (io.debezium.connector.common.BaseSourceTask:97)
[2020-05-09 14:29:30,471] INFO    transform.unwrap.drop.tombstones = false (io.debezium.connector.common.BaseSourceTask:97)
[2020-05-09 14:29:30,471] INFO    transform.unwrap.delete.handling.mode = rewrite (io.debezium.connector.common.BaseSourceTask:97)
        transforms = []
        transforms = []
[2020-05-09 14:29:32,419] INFO    transform.unwrap.type = io.debezium.transforms.ExtractNewRecordState (io.debezium.connector.common.BaseSourceTask:97)
[2020-05-09 14:29:32,419] INFO    transform = unwrap (io.debezium.connector.common.BaseSourceTask:97)
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []
        transforms = []

1 Ответ

0 голосов
/ 09 мая 2020

Похоже, вы допустили опечатку (transform вместо transforms). Попробуйте эту конфигурацию:

{
    // omit ...
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...