Я пытаюсь проверить некоторые данные в новом файле hdfs при обработке текстового файла с помощью функции rdd.map.
Ниже приведен рабочий код.
object data_store {
def main(argsi: Array[String]): Unit = {
. . . .
. . . .
// Set-up checkpoint file
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val output = fs.create(new Path(rejfile))
val ck = new BufferedOutputStream(output)
ck.write("Check pointing.. ")
val mstr_rdd = spark.sparkContext.textFile(infile)
val mstr_recs = mstr_rdd.map(line => convert_data(line) )
mstr_recs.saveAsTextFile(outfile)
ck.close()
} // end of main
def convert_data(line:String):String={
val HDR = line.slice(0,4)
val ACC = line.slice(4,16)
}
Однако, если я изменяю, как показано ниже, выдается ошибка Task Not Serializable
. . . . .
. . . . .
val mstr_rdd = spark.sparkContext.textFile(infile)
val mstr_recs = mstr_rdd.map(line => convert_data(line,ck) ) // <==Changed
mstr_recs.saveAsTextFile(outfile)
ck.close()
} // end of main
def convert_data(line:String,ck:java.io.BufferedOutputStream ):String={ // <==Changed
val HDR = line.slice(0,4)
val ACC = line.slice(4,16)
ck.write(ACC + " mapped") // <==New
}
Как это исправить ?.