Есть ли способ конвертировать строки миллисекунд в формат даты, узнаваемый в mongodb? - PullRequest
1 голос
/ 21 января 2020

Я использую исходный соединитель kafka sql с debezium для отправки данных в разделы, из которых я читаю мой потоковый код scala.

Затем он отправляет потоки в темы. Данные json в этих темах считываются с разъема приемника mongodb, который вставляет json в своем относительном формате mongodb в коллекцию mongodb. Проблема заключается в том, что при отправке в разделы исходный соединитель обрабатывает поля формата даты как строковые миллисекунды, но я хочу сохранить их в формате «date» (скажем, ISO) в mongodb вместо строковых миллисекунд.

Я мог бы использовать SMT TimestampConverter, но для каждой даты требуются отдельные поля, которые больше 100 или даже больше.

Кроме того, если я преобразую строку milliseconds("2147483647000") в моем потоке scala в формат ISO 2038-01-19T03:14:07Z, а затем передам в json, соединитель не распознает формат даты и выдает ошибку, которая это не позволяет '-'. Любой обходной путь к этому?

Ниже приведен пример scala кода, который я написал для отправки фиктивного json топи c через производителя. Предполагается, что соединитель приемника считывает данные из этой топи c, чтобы сохранить ее в коллекции, но соединитель не разрешает '-' в формате даты

 val conf = ConfigFactory.load()
  val props = new Properties
  props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getString("bootstrap.servers"))
  props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, conf.getString("transactions.application.id"))

  import org.apache.kafka.clients.producer.ProducerConfig

  props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
  props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)

  import org.apache.kafka.clients.producer.KafkaProducer
  import org.apache.kafka.clients.producer.ProducerRecord

  val a = 2147483647000L

  val instant = Instant.ofEpochMilli(a)
  val json = "{\"id\":0,\"date\":ISODate(\""+instant+"\")}"
  println(json)

  val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)

  // Create a producer record
  val record: ProducerRecord[String, String] = new ProducerRecord[String, String]("date-topic1", json)
  producer.send(record)
  producer.flush()

Ниже я также добавил небольшой снимок ( только преобразования) конфигурации коннектора источника:

transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
transforms.unwrap.drop.tombstones=true
transforms.unwrap.delete.handling.mode=rewrite
transforms.unwrap.drop.deletes=true
...