я должен закрыть файловую систему HDFS? - PullRequest
0 голосов
/ 02 апреля 2019

Моя искровая работа:

def saveCount(spark: SparkSession, cnt: Long): Unit = {
  val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
  val path = new org.apache.hadoop.fs.Path(s"hdfs://xxx/data/count")
  if(fs.exists(path)) fs.delete(path, true)

  val out = new BufferedOutputStream(fs.create(path))
  out.write(cnt.toString.getBytes("UTF-8"))
  out.flush()
  out.close()
  // fs.close()
}

main function {
  for-loop {
    val df = spark.sql("xxx").cache()
    val cnt = df.count()
    df.write.mode(SaveMode.Overwrite).json(s"yyy")
    saveCount(spark, cnt)
  }
}

Основная задача spark - задание: для цикла каждый раз запрашивается информационный фрейм из spark sql. кадр данных будет выгружен, а счет будет сохранен в файл с помощью функции saveCount().

Мой вопрос: мне звонить fs.close() в saveCount()? (<= Я думаю, ответ НЕТ) </p>

Мое беспокойство: Если он вызывается, это повлияет на дампы данных? Я нашел много java.io.IOException: Filesystem closed исключений в журналах заданий на работу

Спасибо

...