Поскольку Kstreams в ksqldb поддерживает примитивный тип данных (String
или BigInt
), есть ли способ обрабатывать записи-захоронения, поступающие из Kstreams?
В ValueToKey
SMT значения, которые могут быть преобразованные в PK
должны быть частью столбцов таблицы / белого списка MySQL и не могут быть занесены в черный список. Есть ли способ получить значение из одного поля столбца и сопоставить его с разными именами ключей?
Это будет полезно для записей-захоронений, пока поддержка KEY не будет выпущена в Confluent.
Пример - в потоках K SQL,
KEY - {PK1=ABC, PK2=1} (STRING format)
VALUE - {PK1=null, PK2=null, COL3=null, COL4=null} (AVRO format)
Поскольку формат KEY
равен STRING
, формат ключа соединителя приемника должен быть STRING
, а не JSON / AVRO. В этом сценарии есть ли способ обработать запись надгробия?
Я создал два столбца с суффиксом _KEY
для полей PK, чтобы я мог сопоставить значения этих _KEY
столбцов с фактическими столбцами PK и занести в черный список столбцы суффикса _KEY
.
KEY - {PK1=ABC, PK2=1} (STRING format)
VALUE - {PK1_KEY=ABC, PK2_KEY=1, PK1=null, PK2=null, COL3=null, COL4=null} (AVRO format)
В коннекторе приемника я использовал ValueToKey
SMT, чтобы соединитель приемника удалял запись из MySQL согласно значению из суффикса _KEY
columns и взяв исходные имена столбцов, например -
delete from TEST where PK1='ABC' and PK2=1;
"delete.enabled": "true",
"pk.mode": "record_key",
"pk.fields": "PK1,PK2",
"transforms": "ValueToKey,ReplaceField",
"transforms.ValueToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields": "PK1_KEY,PK2_KEY",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.whitelist": "PK1,PK2,COL3,COL4"
Есть ли способ добиться этого? Когда я попытался использовать приведенную выше конфигурацию приемника, у меня возникла ошибка в разъеме приемника.