Мне нужно обработать огромный фрейм данных, загрузить файлы из сервиса по столбцу id
фрейма данных.Логика для загрузки и все изменения подготовлены, но я не уверен, что это лучший способ сделать цикл вокруг этого.Я запускаю это на Databricks , поэтому мне нужно выполнять процессы в чанках.
В кадре данных есть столбец " status ", который может содержать следующиезначения:
"задача", "обработка", "сбой", "успешно"
В цикле while я хочу выполнить следующие задачи:
while (there are rows with status "todo") {
- get the first 10 rows if status is todo (DONE)
- start processing the dataframe, update status to processing (DONE)
- download files (call UDF), update status to succeeded or failed
(DONE, not in the code here)
}
Я бы хотел запустить этот , пока все строки 'status
не будут отличаться от todo
!Проблема в том, что этот цикл while не заканчивается, потому что сам фрейм данных не обновляется.Он должен быть назначен другому фрейму данных, но тогда как добавить новый в цикл?
Мой код прямо сейчас:
while(statusDoc.where("status == 'todo'").count > 0) {
val todoDF = test.filter("status == 'todo'")
val processingDF = todoDF.limit(10).withColumn("status", when(col("status") === "todo", "processing")
.otherwise(col("status")))
statusDoc.join(processingDF, Seq("id"), "outer")
.select($"id", \
statusDoc("fileUrl"), \
coalesce(processingDF("status"), statusDoc("status")).alias("status"))
}
Объединение должно выглядеть следующим образом:
val update = statusDoc.join(processingDF, Seq("id"), "outer")
.select($"id", statusDoc("fileUrl"),\
coalesce(processingDF("status"), statusDoc("status")).alias("status"))
Затем этот новый update
фрейм данных следует использовать для следующего цикла цикла.