spark - как избежать ошибки сериализации при проверке данных в функции rdd map - PullRequest
0 голосов
/ 30 января 2019

Я пытаюсь проверить некоторые данные в новом файле hdfs при обработке текстового файла с помощью функции rdd.map.

Ниже приведен рабочий код.

object data_store {

def main(argsi: Array[String]): Unit = {
. . . . 
. . . .
// Set-up checkpoint file

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
val output = fs.create(new Path(rejfile))
val ck = new BufferedOutputStream(output)

ck.write("Check pointing.. ")

val mstr_rdd = spark.sparkContext.textFile(infile)
val mstr_recs = mstr_rdd.map(line => convert_data(line) )
mstr_recs.saveAsTextFile(outfile)

ck.close()
} // end of main

def convert_data(line:String):String={
  val HDR = line.slice(0,4)
  val ACC = line.slice(4,16)
}

Однако, если я изменяю, как показано ниже, выдается ошибка Task Not Serializable

. . . . .
. . . . .
val mstr_rdd = spark.sparkContext.textFile(infile)
val mstr_recs = mstr_rdd.map(line => convert_data(line,ck) )  // <==Changed
mstr_recs.saveAsTextFile(outfile)

ck.close()
} // end of main

def convert_data(line:String,ck:java.io.BufferedOutputStream ):String={  // <==Changed
  val HDR = line.slice(0,4)
  val ACC = line.slice(4,16)
  ck.write(ACC + " mapped") // <==New 
}

Как это исправить ?.

...