Я использую исходный соединитель kafka sql с debezium для отправки данных в разделы, из которых я читаю мой потоковый код scala.
Затем он отправляет потоки в темы. Данные json в этих темах считываются с разъема приемника mongodb, который вставляет json в своем относительном формате mongodb в коллекцию mongodb. Проблема заключается в том, что при отправке в разделы исходный соединитель обрабатывает поля формата даты как строковые миллисекунды, но я хочу сохранить их в формате «date» (скажем, ISO) в mongodb вместо строковых миллисекунд.
Я мог бы использовать SMT TimestampConverter
, но для каждой даты требуются отдельные поля, которые больше 100 или даже больше.
Кроме того, если я преобразую строку milliseconds("2147483647000")
в моем потоке scala в формат ISO 2038-01-19T03:14:07Z
, а затем передам в json, соединитель не распознает формат даты и выдает ошибку, которая это не позволяет '-'. Любой обходной путь к этому?
Ниже приведен пример scala кода, который я написал для отправки фиктивного json топи c через производителя. Предполагается, что соединитель приемника считывает данные из этой топи c, чтобы сохранить ее в коллекции, но соединитель не разрешает '-' в формате даты
val conf = ConfigFactory.load()
val props = new Properties
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getString("bootstrap.servers"))
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, conf.getString("transactions.application.id"))
import org.apache.kafka.clients.producer.ProducerConfig
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
val a = 2147483647000L
val instant = Instant.ofEpochMilli(a)
val json = "{\"id\":0,\"date\":ISODate(\""+instant+"\")}"
println(json)
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
// Create a producer record
val record: ProducerRecord[String, String] = new ProducerRecord[String, String]("date-topic1", json)
producer.send(record)
producer.flush()
Ниже я также добавил небольшой снимок ( только преобразования) конфигурации коннектора источника:
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
transforms.unwrap.drop.tombstones=true
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.drop.deletes=true