У меня есть требование читать потоковые данные из Azure EventHub и ежедневно выгружать их в местоположение BLOB-объектов. Для этого я использовал Spark Structured Streaming с опцией Trigger.once (), чтобы считывать данные из EventHub и выгружать их в ADLS / blob один раз в день в виде пакета.
Ниже приведен код:
def writeStreamer(input: DataFrame, checkPointFolder: String, output: String): StreamingQuery = {
input
.coalesce(1)
.writeStream
.format("JSON")
.partitionBy("year", "month", "day")
.option("checkpointLocation", checkPointFolder)
.option("path", output)
.outputMode(OutputMode.Append)
.trigger(Trigger.Once())
.start()
}
Когда я запускаю задание, используя spark-shell (режим тестирования), данные успешно записываются в выходные каталоги (YYYY-MM-DD), так как мой EventHub хранит данные за 4 дня. Но то, что мне более интересно знать, - предположим, если я соберу jar и разверну его в кластере, чтобы запускать его раз в день, опция Trigger.Once () добавит данные предыдущего дня. То есть, если я запусту банку сегодня рано утром, она добавит оставшиеся вчера данные в папку вчерашнего дня. Работает ли добавление данных без каких-либо проблем, если мы используем опцию Trigger.Once () с указанием чека.
Любая помощь будет принята с благодарностью.