Мы используем эту конфигурацию на стороне источника:
{"name": "mongo-source-1",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max":"3",
"connection.uri":"mongodb://mongo1:27017,mongo2:27017,mongo3:27017",
"topic.prefix":"mongo",
"database":"test",
"collection":"investigate1",
"change.stream.full.document": "updateLookup",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable":"false"
}}
Но она генерирует данные, которые выглядят так:
"somefield": {
"$numberLong": "2342423432432432434324"
}
И затем, когда мы опускаем ее на s3, мы можем не запускает запросы Athena, потому что они ломаются на $.
Как мы генерируем регулярные json из этого соединителя источника, чтобы не столкнуться с проблемами?
Чтобы было ясно, мы просто хотим, чтобы это выглядело так:
"somefield": 2342423432432432434324"
Официальное руководство по соединителю mongodb kafka не поможет, оно даже не обсуждает key.converter и значение Параметры .converter.
Может быть, на стороне приемника есть опция, которая преобразует это?
Вот наша конфигурация приемника:
{
“name”: “s3-sink-1’“,
“config”: {
“connector.class”:“io.confluent.connect.s3.S3SinkConnector”,
“tasks.max”:“3”,
“topics.dir”:“topics”,
“format.class”: “io.confluent.connect.s3.format.json.JsonFormat”,
“topics”:“mongo.test.investigate1’“,
“s3.bucket.name”:“superawesomebucketname",
“s3.region”:“us-east-2",
“s3.part.size”:“5242880",
“flush.size”:“1000",
“rotate.schedule.interval.ms”:“5000",
“timezone”: “UTC”,
“key.converter”:“org.apache.kafka.connect.json.JsonConverter”,
“key.converter.schemas.enable”:“false”,
“value.converter”:“org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”:“false”,
“storage.class”:“io.confluent.connect.s3.storage.S3Storage”
}}