Как сгенерировать временную метку для каждой микропакета письменных записей в Spark Structured Streaming? - PullRequest
0 голосов
/ 28 сентября 2018

Я читаю данные из Kinesis и записываю их в ElasticEearch через структурированную потоковую передачу Spark.Мне нужно сохранить временную метку, при которой каждая микропакет записывается в индекс ElasticSearch, как часть полей в каждой записи.

Например, первая микропакет из потока содержит записи 10K, временная метка дляэти 10K записи должны отражать момент, когда они были обработаны (или записаны в ElasticSearch).Тогда у нас должна быть новая временная метка, когда обрабатывается вторая микропакет, и так далее.

Я попытался добавить новый столбец с функцией current_timestamp:

.withColumn("recordDate", current_timestamp())

Но похоже, что функция оценивается только один раз за все время существования запроса.В результате все сохраненные записи будут иметь одинаковую временную метку, указывающую момент начала запроса.Таким образом, эта временная метка, по-видимому, представляет «дату начала запроса», а не желаемую, которая представляет «время записи записи».

Было бы действительно здорово, если бы кто-то мог помочь объяснить, как этого можно достичь.

высоко ценится

1 Ответ

0 голосов
/ 28 сентября 2018

Вы можете сделать это с помощью udf, как показано ниже, Вы также можете добавить свое собственное форматирование,

import org.apache.spark.sql.functions.udf

 def current_time = udf(() => {
    java.time.LocalDateTime.now().toString
  })

Чтобы использовать его,

val streamingDF = ???
val streamingDFWithTime .withColumn("time", current_time()))
streamingDFWithTime.writeStream
...
...

PS: я использовал udf вместо встроенного current_timestamp, потому что использование его непосредственно в потоке приводит к проблеме, обсуждаемой здесь и здесь

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...