Обработка записей захоронения, поступающих из потоков Ksqldb в соединителе приемника jdb c - PullRequest
0 голосов
/ 13 июля 2020

Поскольку Kstreams в ksqldb поддерживает примитивный тип данных (String или BigInt), есть ли способ обрабатывать записи-захоронения, поступающие из Kstreams?

В ValueToKey SMT значения, которые могут быть преобразованные в PK должны быть частью столбцов таблицы / белого списка MySQL и не могут быть занесены в черный список. Есть ли способ получить значение из одного поля столбца и сопоставить его с разными именами ключей?

Это будет полезно для записей-захоронений, пока поддержка KEY не будет выпущена в Confluent.

Пример - в потоках K SQL,

KEY - {PK1=ABC, PK2=1} (STRING format)
VALUE - {PK1=null, PK2=null, COL3=null, COL4=null} (AVRO format)

Поскольку формат KEY равен STRING, формат ключа соединителя приемника должен быть STRING, а не JSON / AVRO. В этом сценарии есть ли способ обработать запись надгробия?

Я создал два столбца с суффиксом _KEY для полей PK, чтобы я мог сопоставить значения этих _KEY столбцов с фактическими столбцами PK и занести в черный список столбцы суффикса _KEY.

KEY - {PK1=ABC, PK2=1} (STRING format)
VALUE - {PK1_KEY=ABC, PK2_KEY=1, PK1=null, PK2=null, COL3=null, COL4=null} (AVRO format)

В коннекторе приемника я использовал ValueToKey SMT, чтобы соединитель приемника удалял запись из MySQL согласно значению из суффикса _KEY columns и взяв исходные имена столбцов, например -

delete from TEST where PK1='ABC' and PK2=1;
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "PK1,PK2",
"transforms": "ValueToKey,ReplaceField",
"transforms.ValueToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "PK1_KEY,PK2_KEY",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.whitelist": "PK1,PK2,COL3,COL4"

Есть ли способ добиться этого? Когда я попытался использовать приведенную выше конфигурацию приемника, у меня возникла ошибка в разъеме приемника.

...