Правильная конфигурация для MongoDB Kafka Connect Connector при отправке данных на S3 - PullRequest
0 голосов
/ 09 апреля 2020

Мы используем эту конфигурацию на стороне источника:

{"name": "mongo-source-1",
      "config": {
        "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
        "tasks.max":"3",  
        "connection.uri":"mongodb://mongo1:27017,mongo2:27017,mongo3:27017",
        "topic.prefix":"mongo",
        "database":"test",
        "collection":"investigate1",
        "change.stream.full.document": "updateLookup",
        "key.converter":"org.apache.kafka.connect.storage.StringConverter",  
        "key.converter.schemas.enable":"false",
        "value.converter":"org.apache.kafka.connect.storage.StringConverter",
        "value.converter.schemas.enable":"false"
    }}

Но она генерирует данные, которые выглядят так:

"somefield": {
      "$numberLong": "2342423432432432434324"
    }

И затем, когда мы опускаем ее на s3, мы можем не запускает запросы Athena, потому что они ломаются на $.

Как мы генерируем регулярные json из этого соединителя источника, чтобы не столкнуться с проблемами?

Чтобы было ясно, мы просто хотим, чтобы это выглядело так:

"somefield": 2342423432432432434324"

Официальное руководство по соединителю mongodb kafka не поможет, оно даже не обсуждает key.converter и значение Параметры .converter.

Может быть, на стороне приемника есть опция, которая преобразует это?

Вот наша конфигурация приемника:

{
  “name”: “s3-sink-1’“,
  “config”: {
  “connector.class”:“io.confluent.connect.s3.S3SinkConnector”,
  “tasks.max”:“3”,
  “topics.dir”:“topics”,
  “format.class”: “io.confluent.connect.s3.format.json.JsonFormat”,
  “topics”:“mongo.test.investigate1’“,
  “s3.bucket.name”:“superawesomebucketname",
  “s3.region”:“us-east-2",
  “s3.part.size”:“5242880",
  “flush.size”:“1000",
  “rotate.schedule.interval.ms”:“5000",
  “timezone”: “UTC”,
  “key.converter”:“org.apache.kafka.connect.json.JsonConverter”,
  “key.converter.schemas.enable”:“false”,
  “value.converter”:“org.apache.kafka.connect.json.JsonConverter”,
  “value.converter.schemas.enable”:“false”,
  “storage.class”:“io.confluent.connect.s3.storage.S3Storage”
}}
...