Spark Kafka Streaming - отправлять оригинальную метку времени вместо текущей - PullRequest
0 голосов
/ 09 ноября 2018

Я использую структурированную потоковую передачу spark для отправки записей в тему кафки. Тема кафки создается с помощью конфига - message.timestamp.type=CreateTime

Это сделано для того, чтобы целевые записи темы Кафки имели ту же метку времени, что и исходные записи.

Мой потоковый код kafka:

kafkaRecords.selectExpr("CAST(key AS STRING)", "CAST(value AS BINARY)","CAST(timestamp AS TIMESTAMP)")
    .write
    .format("kafka")
    .option("kafka.bootstrap.servers","IP Of kafka")
    .option("topic",targetTopic)
    .option("kafka.max.in.flight.requests.per.connection", "1")
    .option("checkpointLocation",checkPointLocation)
    .save()

Тем не менее, это не сохраняет исходную метку времени, которая является 2018/11/04, а метка времени отражает самую последнюю дату 2018/11/9.

С другой стороны, просто для подтверждения того, что конфигурация kafka работает, когда я явно создаю записи Kafka Producer и продюсера, имеющие метку времени, и отправляю их, исходная метка времени сохраняется.

Как я могу получить такое же поведение в Kafka Structured Streaming?

1 Ответ

0 голосов
/ 09 ноября 2018

Конфигурация темы CreateTime будет означать, что при создании записей вы получаете то время, которое вы получите.

Непонятно, где вы читаете данные и видите метки времени, если вы запускаете код производителя «сегодня», это время, которое они получают, а не раньше.

Если вам нужны временные метки прошлого, вам действительно нужно, чтобы ваши ProducerRecord s содержали эту временную метку, используя конструктор, который включает параметр временной метки, но Spark не предоставляет его.

Если вы поместите только метку времени в значение полезной нагрузки, как вы это делаете, это время, которое вы захотите провести анализ, вероятно, не ConsumerRecord.timestamp()


Если вы хотите точно скопировать данные из одной темы в другую, Kafka использует MirrorMaker для этого.Тогда вам нужны только файлы конфигурации, а не написание и развертывание кода Spark

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...