У меня есть рабочее решение, но я ищу некоторые способы сделать это безопаснее и лучше.
Каждый раз, когда запускается работа, она ищет пользовательскую контрольную точку, которая указывает, из какого дата должна начать обработку. Из исходного кадра данных я создаю тот, который начинается с указанной даты начала - на основе контрольной точки. Теперь решение ограничивает строки обрабатываемого кадра данных:
val readFormat = "delta"
val sparkRead = spark.read.format(readFormat)
val fileFormat = if (readFormat == "delta") "" else "." + readFormat
val testData = sparkRead
.load(basePath + "/testData/table_name" + fileFormat)
.where(!((col("size") < 1)))
.where($"modified" >= start)
.limit(5000)
Для каждого идентификатора я загружаю файлы из Azure Хранилище и сохраняю содержимое в новом столбце кадра данных:
val tryDownload = testData
.withColumn(
"fileStringPreview",
downloadUDF($"id"))
.withColumn(
"status",
when(
(($"fileStringPreview"
.startsWith("failed:") === true) ||
($"fileStringPreview"
.startsWith("emptyUrl") === true)),
lit("failed")).otherwise(
lit("succeeded")))
Когда это будет сделано, контрольная точка обновляется до самой последней измененной даты из элементов, обработанных в этой итерации.
def saveLatest(saved_df: DataFrame, timeSeriesColName: String): Unit = {
val latestTime = saved_df.agg(max(timeSeriesColName)).collect()(0)
try {
val timespanEnd = latestTime.getTimestamp(0).toInstant().toEpochMilli()
saveTimestamp(timespanEnd) // this function actually stores the data
} catch {
case e: java.lang.NullPointerException => {
LoggingWrapper.log("timespanEnd is null");
}
}
}
saveLatest(tryDownload, "modified")
Меня беспокоит это решение limit (5000) , есть ли лучший способ, позволяющий сохранять хорошую производительность при загрузке указанного числа файлов в каждой итерации?
Спасибо за предложения заранее! :)