Вставка данных в MongoDB с помощью Kafka-MongoDB-Connect - PullRequest
2 голосов
/ 09 мая 2019

Я пытаюсь вставить данные из раздела Kafka в базу данных MongoDB с помощью Kafka-MongoDB-Connect.

Файл свойств Kafka Connect:

group.id=stage
offset.storage.topic=stage_offset
config.storage.topic=stage_config
status.storage.topic=stage_status
bootstrap.servers=****************:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
rest.host.name=localhost
rest.port=9002
plugin.path=/mnt/d/confluent-5.2.1/plugins
offset.storage.file.filename=/tmp/connect.offsets

Данные в теме Кафки:

1557408134887 | 7316|+|2019-05-09 | 7316 | 2019-05-09 | 249.35999999999999
1557408134941 | 6110|+|2019-05-09 | 6110 | 2019-05-09 | 3321.0399999999995
1557408135199 | 11688|+|2019-05-09 | 11688 | 2019-05-09 | 3261.800000000001
1557408134603 | 55247|+|2019-05-09 | 55247 | 2019-05-09 |995.4200000000002

А это ФОРМАТСОН:

{"STOREID":3194,"BUSINESSDATE":"2019-05-09","CA":"1.82"}
{"STOREID":4479,"BUSINESSDATE":"2019-05-09","CA":"7.71"}
{"STOREID":6818,"BUSINESSDATE":"2019-05-09","CA":"121.69"}
{"STOREID":9127,"BUSINESSDATE":"2019-05-09","CA":"67.82"}
{"STOREID":4987,"BUSINESSDATE":"2019-05-09","CA":"40.56"}
{"STOREID":1654,"BUSINESSDATE":"2019-05-09","CA":"26.94"}

Разъем MongoDB:

{
    "name": "mongo-sink-connector",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": "1",
        "topics": "VENTE_PDV_JOUR",
        "mongodb.connection.uri": "mongodb://localhost:27018/stage-project?w=1&journal=true",
        "mongodb.value.projection.type": "whitelist",
        "mongodb.value.projection.list":"STOREID,BUSINESSDATE,CA",
        "mongodb.collection": "ticket-data",
        "flush.size": "100",
        "rotate.interval.ms": "1000",
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": false,
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter.schemas.enable": false
    }
}

но я получаю эту ошибку:

enter image description here

...