Я использую Debezium для обнаружения изменений в исходных таблицах MySql.Как я могу создать сообщения Kafka таким образом, чтобы ключ представлял собой числовое (Long
) значение вместо объекта Json?
Что я получаю:
key: {"foo_id": 123}
value: {"foo_id": 123, "bar": "blahblah", "baz": "meh......"}
Что я хочу:
key: 123
value: {"foo_id": 123, "bar": "blahblah", "baz": "meh......"}
Моя таблица FOO выглядит следующим образом:
foo_id: INT
bar: VARCHAR
baz: VARCHAR
Обратите внимание, что я не использую avro, и я экспериментировал с несколькими комбинациями ниже (с / без ключевых преобразователей)) но не удалось получить ключ Long
.
"transforms": "unwrap,insertKey,extractKey",
"transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones":"false",
"transforms.insertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.insertKey.fields":"foo_id",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"foo_id",
"key.converter" : "org.apache.kafka.connect.converters.LongConverter",
"key.converter.schemas.enable": "false",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
Я не уверен, что ValueToKey или ExtractField работает для (MySQL) Source, но я получаю ниже NPE.
Caused by: java.lang.NullPointerException
at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:44)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)