Как преобразовать значение JSON в ключ сообщения Kafka с помощью Debezium MongoDB Source Connector? - PullRequest
0 голосов
/ 10 октября 2019

Я использую Debezium MongoDB Connector для прослушивания определенной коллекции MongoDB, чтобы каждая запись была в виде сообщения в теме кафки. Это прекрасно работает со следующей конфигурацией подключения kafka:

{
  "name": "mongo-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "192.168.0.151:27017",
    "mongodb.name": "mongo",
    "database.whitelist": "database",
    "tasks.max": 1,
    "max.batch.size": 2048,
    "poll.interval.ms": 5000,
    "collection.whitelist": "database.collection"
  }
}

В этой конфигурации каждое сообщение Kafka имеет идентификатор исходной записи данных из MongoDB. Теперь я пытаюсь выполнить преобразование ключа, чтобы получить определенное значение из поля внутри документа JSON в качестве ключа сообщения в kafka. Причина этого заключается в том, что данные должны быть разделены с использованием этого поля.

Я уже пробовал следующий конфиг для создания ключа:

{
  "name": "mongo-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "192.168.0.151:27017",
    "mongodb.name": "mongo",
    "database.whitelist": "database",
    "tasks.max": 1,
    "max.batch.size": 2048,
    "poll.interval.ms": 5000,
    "collection.whitelist": "database.collection",
    "transforms":"createKey",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey", 
    "transforms.createKey.fields": "specific-field-in-mongodb-source-record"
  }
}

Тогда я получаю эту ошибку только в Kafka Connect:

[2019-10-10 11:35:44,049] INFO 2048 records sent for replica set 'dev-shard-01', last offset: {sec=1570707340, ord=1, initsync=true, h=-8774414475389548112} (io.debezium.connector.mongodb.MongoDbConnectorTask)
[2019-10-10 11:35:44,050] INFO WorkerSourceTask{id=mongo-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-10-10 11:35:44,050] INFO WorkerSourceTask{id=mongo-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2019-10-10 11:35:44,050] ERROR WorkerSourceTask{id=mongo-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
java.lang.NullPointerException
        at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:85)
        at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-10-10 11:35:44,050] ERROR WorkerSourceTask{id=mongo-source-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

Еще одна конфигурация, которую я пробовал, следующая:

{
  "name": "mongo-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "192.168.0.151:27017",
    "mongodb.name": "mongo",
    "database.whitelist": "database",
    "tasks.max": 1,
    "max.batch.size": 2048,
    "poll.interval.ms": 5000,
    "collection.whitelist": "database.collection",
    "transforms": "unwrap,insertKey,extractKey",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.insertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.insertKey.fields": "specific-field-in-mongodb-source-record",
    "transforms.extractKey.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractKey.field": "specific-field-in-mongodb-source-record",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}

Это также приводит к ошибке:

[2019-10-10 12:27:04,915] ERROR WorkerSourceTask{id=mongo-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [copying fields from value to key], found: java.lang.String
        at org.apache.kafka.connect.transforms.util.Requirements.requireStruct(Requirements.java:52)
        at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:79)
        at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:65)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-10-10 12:27:04,915] ERROR WorkerSourceTask{id=mongo-source-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

Кто-нибудь знает, если и как яможно преобразовать элемент документа JSON из MongoDB в ключ сообщения Kafka?

Спасибо!

1 Ответ

1 голос
/ 10 октября 2019

После еще нескольких испытаний я нашел подходящее решение. Оказывается, мне не нужно третье преобразование. Достаточно просто использовать преобразование ValueToKey.

Для полноты, вот рабочая конфигурация:

{
  "name": "mongo-source-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "192.168.0.151:27017",
    "mongodb.name": "mongo",
    "database.whitelist": "database",
    "tasks.max": 1,
    "max.batch.size": 2048,
    "poll.interval.ms": 5000,
    "collection.whitelist": "database.collection",
    "transforms": "unwrap,insertKey",
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.delete.handling.mode":"drop",
    "transforms.unwrap.operation.header":"true",
    "transforms.insertKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.insertKey.fields": "specific-field-in-mongodb-source-record",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false"
  }
}
...