Потоковая Azure передача данных EventHub в ADLS / Blob с использованием Spark Structured Streaming на ежедневной основе - PullRequest
1 голос
/ 10 апреля 2020

У меня есть требование для потоковой передачи событий из Azure EventHub (EH) для ежедневного BLOB-объекта. Для достижения этой цели я использовал структурированную потоковую передачу Spark с опцией Trigger.once () вместе с указанием искровой проверки. Но что мне еще нужно, так это когда я запускаю задание Spark с опцией Trigger.once () ежедневно, мне нужно ежедневно создавать папку («ГГГГ-ММ-ДД») в BLOB-объекте и выгружать все события / данные, которые были доступны в EH для этого конкретного дня.

Для достижения этого я использовал column = 'enqueuedTime', который доступен в схеме EH, как показано ниже, и использовал partitionBy при записи данных в BLOB-объект.

withColumn("year", year(to_date($"enqueuedTime", "MM/dd/yyyy"))).withColumn("month", month(to_date($"enqueuedTime", "MM/dd/yyyy")))

batchSignalsFinalDF
  .writeStream
  .format("JSON")
  .partitionBy("year", "month", "day")
  .option("checkpointLocation", "PATH IN BLOB")
  .outputMode(OutputMode.Append).trigger(Trigger.Once())
  .start("PATH IN BLOB")

Похоже, это не работает для одного из моих вариантов использования, есть ли лучший способ добиться этого !, Любое предложение будет оценено.

...