Spark Streaming - кэширование не происходит после вызова clearCache () - PullRequest
0 голосов
/ 05 марта 2020

Я выполняю потоковое задание с несколькими кэшами данных. Каждый час я пытаюсь очистить кеши, вызывая sparkSession.catalog.clearCache () внутри слушателя onQueryProgress (), проверяя текущую метку времени.

Хотя кеши корректно очищаются в нужное время, после очистки они не кэшируются для последующих микропакетов. Любая идея, в чем может быть причина.

Слушатель:

 override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
    val now: Calendar = Calendar.getInstance()
    println("Query made progress:" + Logger.GetLogDateTime(now.getTime()) + ":" + queryProgress.progress)
    checkAndRefreshCaches(now, checkPointLocation, cacheReloadFrequencyInHours, fs, sparkSession)
    queryProgress.progress.processedRowsPerSecond
  }

Метод кэширования Refre sh:

private def checkAndRefreshCaches(now: Calendar, checkPointLocation: String, cacheReloadFrequencyInHours: Int, fs: FileSystem, sparkSession: SparkSession) = {
    if (cacheReloadFrequencyInHours >= 1 && cacheReloadFrequencyInHours <= 24) {
      val date = new SimpleDateFormat("yyyyMMdd").format(now.getTime())
      val hour = now.get(Calendar.HOUR_OF_DAY)
      val min = now.get(Calendar.MINUTE)
      val sec = now.get(Calendar.SECOND)
      val cacheCheckpointPath = new org.apache.hadoop.fs.Path("%s/cacheReload_%s_%s%s.done".format(checkPointLocation, date, hour, min))

      if (!fs.exists(cacheCheckpointPath) && hour % cacheReloadFrequencyInHours == 0 && min == 0) {
        println("Reloading caches at:" + Logger.GetLogDateTime(now.getTime()))
        sparkSession.catalog.clearCache()
        fs.create(cacheCheckpointPath)
      }
    }
  }

Пожалуйста, дайте мне знать, если есть какие-либо проблемы с вышеуказанным подходом

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...