У меня есть требование для потоковой передачи событий из Azure EventHub (EH) для ежедневного BLOB-объекта. Для достижения этой цели я использовал структурированную потоковую передачу Spark с опцией Trigger.once () вместе с указанием искровой проверки. Но что мне еще нужно, так это когда я запускаю задание Spark с опцией Trigger.once () ежедневно, мне нужно ежедневно создавать папку («ГГГГ-ММ-ДД») в BLOB-объекте и выгружать все события / данные, которые были доступны в EH для этого конкретного дня.
Для достижения этого я использовал column = 'enqueuedTime', который доступен в схеме EH, как показано ниже, и использовал partitionBy при записи данных в BLOB-объект.
withColumn("year", year(to_date($"enqueuedTime", "MM/dd/yyyy"))).withColumn("month", month(to_date($"enqueuedTime", "MM/dd/yyyy")))
batchSignalsFinalDF
.writeStream
.format("JSON")
.partitionBy("year", "month", "day")
.option("checkpointLocation", "PATH IN BLOB")
.outputMode(OutputMode.Append).trigger(Trigger.Once())
.start("PATH IN BLOB")
Похоже, это не работает для одного из моих вариантов использования, есть ли лучший способ добиться этого !, Любое предложение будет оценено.