Разъем Debezium Kafka CD C делает ключ авро, даже когда конвертер StringConverver - PullRequest
0 голосов
/ 02 мая 2020

Это мои коннекторы:

curl -s -k -X POST http://***************:8083/connectors -H "Content-Type: application/json" -d '{
  "name": "mysql-cdc-CUSTOMER_DETAILS-007",
  "config": {
    "tasks.max":"2",
    "poll.interval.ms":"500",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "dbnode",
    "database.port": "3306",
    "database.user": "**********",
    "database.password": "###########",
    "database.server.name": "dbnode",
    "database.whitelist": "device_details",
    "database.history.kafka.bootstrap.servers": "**********:9092",
    "database.history.kafka.topic": "schema-changes.device_details",
    "include.schema.changes":"true",
    "table.whitelist":"device_details.tb_customermst",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://************:8081",
    "internal.key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "internal.value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "internal.key.converter.schemas.enable":"false",
    "internal.value.converter.schemas.enable":"false"
  }
}' | jq '.'

При использовании данных из k sql он выглядит следующим образом:

ksql> print 'Device_Details.device_details.tb_customermst' from beginning;
Format:AVRO
5/2/20 2:08:34 PM IST, Struct{customerid=10001}, {"before": null, "after": {"customerid": 10001, "firstname": "Klara", "lastname": "Djokic", "emailid": "klara.djokic007@iillii.org", "mobilenumber": "+1 (480) 361-5311", "customertype": "Commercial", "emailverified": 1, "mobileverified": 1, "city": "Gilbert", "postcode": "85296", "address": "3426 E Elgin St", "latitude": 33.29840528, "longitude": -111.71571314, "UPDATE_TS": "2020-05-02T08:38:33Z"}, "source": {"version": "1.1.0.Final", "connector": "mysql", "name": "Device_Details", "ts_ms": 1588408713000, "snapshot": "false", "db": "device_details", "table": "tb_customermst", "server_id": 1, "gtid": "98557612-65ba-11ea-8dc4-000c29bcb2b4:6", "file": "binlog.000044", "pos": 4417, "row": 0, "thread": 8, "query": null}, "op": "c", "ts_ms": 1588408713761, "transaction": null}

Ключ Struct {customerid = 10001} и мне нужен ключ как 10001

Как этого добиться ...

При использовании ValueToKey и ExtractField $ Key В журнале подключения SMT выдается следующая ошибка:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:315)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
    at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)

Может кто-нибудь подсказать, что нужно сделать, чтобы получить ключ как 10001.

Спасибо в advance.

PS: я использую .... Confluent Platform 5.4.0 .... Разъем Debezium для MySql 1.1.0

1 Ответ

2 голосов
/ 02 мая 2020

Ты почти у цели. Вам нужно использовать ExtractField$Key transform отдельно (т.е. без ValueToKey), чтобы вывести значение из структуры.

"transforms":"extractKeyfromStruct",
"transforms.extractKeyfromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKeyfromStruct.field":"customerid",
...