Я читаю данные из Kinesis и записываю их в ElasticEearch через структурированную потоковую передачу Spark.Мне нужно сохранить временную метку, при которой каждая микропакет записывается в индекс ElasticSearch, как часть полей в каждой записи.
Например, первая микропакет из потока содержит записи 10K, временная метка дляэти 10K записи должны отражать момент, когда они были обработаны (или записаны в ElasticSearch).Тогда у нас должна быть новая временная метка, когда обрабатывается вторая микропакет, и так далее.
Я попытался добавить новый столбец с функцией current_timestamp:
.withColumn("recordDate", current_timestamp())
Но похоже, что функция оценивается только один раз за все время существования запроса.В результате все сохраненные записи будут иметь одинаковую временную метку, указывающую момент начала запроса.Таким образом, эта временная метка, по-видимому, представляет «дату начала запроса», а не желаемую, которая представляет «время записи записи».
Было бы действительно здорово, если бы кто-то мог помочь объяснить, как этого можно достичь.
высоко ценится