Моя искровая работа:
def saveCount(spark: SparkSession, cnt: Long): Unit = {
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val path = new org.apache.hadoop.fs.Path(s"hdfs://xxx/data/count")
if(fs.exists(path)) fs.delete(path, true)
val out = new BufferedOutputStream(fs.create(path))
out.write(cnt.toString.getBytes("UTF-8"))
out.flush()
out.close()
// fs.close()
}
main function {
for-loop {
val df = spark.sql("xxx").cache()
val cnt = df.count()
df.write.mode(SaveMode.Overwrite).json(s"yyy")
saveCount(spark, cnt)
}
}
Основная задача spark - задание: для цикла каждый раз запрашивается информационный фрейм из spark sql. кадр данных будет выгружен, а счет будет сохранен в файл с помощью функции saveCount()
.
Мой вопрос: мне звонить fs.close()
в saveCount()
? (<= Я думаю, ответ НЕТ) </p>
Мое беспокойство:
Если он вызывается, это повлияет на дампы данных? Я нашел много java.io.IOException: Filesystem closed
исключений в журналах заданий на работу
Спасибо