Я выполняю потоковое задание с несколькими кэшами данных. Каждый час я пытаюсь очистить кеши, вызывая sparkSession.catalog.clearCache () внутри слушателя onQueryProgress (), проверяя текущую метку времени.
Хотя кеши корректно очищаются в нужное время, после очистки они не кэшируются для последующих микропакетов. Любая идея, в чем может быть причина.
Слушатель:
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
val now: Calendar = Calendar.getInstance()
println("Query made progress:" + Logger.GetLogDateTime(now.getTime()) + ":" + queryProgress.progress)
checkAndRefreshCaches(now, checkPointLocation, cacheReloadFrequencyInHours, fs, sparkSession)
queryProgress.progress.processedRowsPerSecond
}
Метод кэширования Refre sh:
private def checkAndRefreshCaches(now: Calendar, checkPointLocation: String, cacheReloadFrequencyInHours: Int, fs: FileSystem, sparkSession: SparkSession) = {
if (cacheReloadFrequencyInHours >= 1 && cacheReloadFrequencyInHours <= 24) {
val date = new SimpleDateFormat("yyyyMMdd").format(now.getTime())
val hour = now.get(Calendar.HOUR_OF_DAY)
val min = now.get(Calendar.MINUTE)
val sec = now.get(Calendar.SECOND)
val cacheCheckpointPath = new org.apache.hadoop.fs.Path("%s/cacheReload_%s_%s%s.done".format(checkPointLocation, date, hour, min))
if (!fs.exists(cacheCheckpointPath) && hour % cacheReloadFrequencyInHours == 0 && min == 0) {
println("Reloading caches at:" + Logger.GetLogDateTime(now.getTime()))
sparkSession.catalog.clearCache()
fs.create(cacheCheckpointPath)
}
}
}
Пожалуйста, дайте мне знать, если есть какие-либо проблемы с вышеуказанным подходом