MongoDB Kafka Sink Connector не обрабатывает процессор RenameByRegex - PullRequest
1 голос
/ 13 апреля 2020

Мне нужно прослушать события из Кафки Топи c и отправить в коллекцию в MongoDB. Сообщение содержит вложенный объект со свойством id, как в примере выше.

{
    "testId": 1,
    "foo": "bar",
    "foos": [{ "id":"aaaaqqqq-rrrrr" }]
}

Я пытаюсь переименовать этот вложенный идентификатор в _id с RegExp

{
        "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
        "topics": "test",
        "connection.uri": "mongodb://mongo:27017",
        "database": "test_db",
        "collection": "test",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
        "value.projection.list":"testId",
        "value.projection.type": "whitelist",
        "post.processor.chain": "com.mongodb.kafka.connect.sink.processor.DocumentIdAdder, com.mongodb.kafka.connect.sink.processor.field.renaming.RenameByRegex",
        "field.renamer.regexp": "[{\"regexp\":\"\b(id)\b\", \"pattern\":\"\b(id)\b\",\"replace\":\"_id\"}]"
    }

И результат конфигурации / проверки будет 500 Internal Server Error, с этим сообщением:

{
    "error_code": 500,
    "message": null
}

Я что-то упустил или это проблема?

1 Ответ

0 голосов
/ 13 апреля 2020

Я думаю, все, что вам нужно, это Kafka Connect Single Message Transform (SMT) , а точнее ReplaceField:

Фильтровать или переименовывать поля внутри a Структура или Карта.


Следующее заменит id имя поля на _id:

"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "id:_id"

В вашем случае, перед применением Вы можете также ввести Flatten foos:

"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "."

и, наконец, применить преобразование для переименования поля:

"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "foos.id:foos._id"
...