Обрабатывать только несколько файлов за один раунд - PullRequest
2 голосов
/ 29 апреля 2020

У меня есть рабочее решение, но я ищу некоторые способы сделать это безопаснее и лучше.

Каждый раз, когда запускается работа, она ищет пользовательскую контрольную точку, которая указывает, из какого дата должна начать обработку. Из исходного кадра данных я создаю тот, который начинается с указанной даты начала - на основе контрольной точки. Теперь решение ограничивает строки обрабатываемого кадра данных:

val readFormat = "delta"
val sparkRead = spark.read.format(readFormat)

val fileFormat = if (readFormat == "delta") "" else "." + readFormat
val testData = sparkRead
                  .load(basePath + "/testData/table_name" + fileFormat)
                  .where(!((col("size") < 1)))
                  .where($"modified" >= start)
                  .limit(5000)

Для каждого идентификатора я загружаю файлы из Azure Хранилище и сохраняю содержимое в новом столбце кадра данных:

val tryDownload = testData
                    .withColumn(
                        "fileStringPreview",
                        downloadUDF($"id"))
                     .withColumn(
                            "status",
                            when(
                              (($"fileStringPreview"
                                .startsWith("failed:") === true) ||
                               ($"fileStringPreview"
                                .startsWith("emptyUrl") === true)),
                              lit("failed")).otherwise(
                              lit("succeeded")))

Когда это будет сделано, контрольная точка обновляется до самой последней измененной даты из элементов, обработанных в этой итерации.

def saveLatest(saved_df: DataFrame, timeSeriesColName: String): Unit = {
val latestTime = saved_df.agg(max(timeSeriesColName)).collect()(0)
try {
  val timespanEnd = latestTime.getTimestamp(0).toInstant().toEpochMilli()
  saveTimestamp(timespanEnd) // this function actually stores the data
} catch {
  case e: java.lang.NullPointerException => {
    LoggingWrapper.log("timespanEnd is null");
  }
}

}

saveLatest(tryDownload, "modified")  

Меня беспокоит это решение limit (5000) , есть ли лучший способ, позволяющий сохранять хорошую производительность при загрузке указанного числа файлов в каждой итерации?

Спасибо за предложения заранее! :)

...