Я использую структурированную потоковую передачу spark для отправки записей в тему кафки. Тема кафки создается с помощью конфига - message.timestamp.type=CreateTime
Это сделано для того, чтобы целевые записи темы Кафки имели ту же метку времени, что и исходные записи.
Мой потоковый код kafka:
kafkaRecords.selectExpr("CAST(key AS STRING)", "CAST(value AS BINARY)","CAST(timestamp AS TIMESTAMP)")
.write
.format("kafka")
.option("kafka.bootstrap.servers","IP Of kafka")
.option("topic",targetTopic)
.option("kafka.max.in.flight.requests.per.connection", "1")
.option("checkpointLocation",checkPointLocation)
.save()
Тем не менее, это не сохраняет исходную метку времени, которая является 2018/11/04, а метка времени отражает самую последнюю дату 2018/11/9.
С другой стороны, просто для подтверждения того, что конфигурация kafka работает, когда я явно создаю записи Kafka Producer и продюсера, имеющие метку времени, и отправляю их, исходная метка времени сохраняется.
Как я могу получить такое же поведение в Kafka Structured Streaming?