@ mMorozonv's Выглядит хорошо.У вас может быть один DAG, запускающий поток, если он не существует.Затем второй DAG для проверки работоспособности.Если проверка работоспособности не пройдена, вы можете запустить первый DAG снова.
В качестве альтернативы вы можете запустить поток с интервалом trigger
, равным once
[1].
# Load your Streaming DataFrame
sdf = spark.readStream.load(path="data/", format="json", schema=my_schema)
# Perform transformations and then write…
sdf.writeStream.trigger(once=True).start(path="/out/path", format="parquet")
Это даетВы все те же преимущества потоковой передачи искры, с гибкостью пакетной обработки.
Вы можете просто направить поток на ваши данные, и это задание обнаружит все новые файлы со времени последней итерации (с помощью контрольной точки), запуститепотоковая партия, а затем прекратить.Вы можете запускать расписание вашей группы DAG для воздушных потоков в соответствии с любой задержкой, с которой вы хотите обрабатывать данные (каждую минуту, час и т. Д.).
Я бы не рекомендовал это для требований низкой задержки, но его очень удобно запускать каждую минуту.
[1] https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html