Ниже мой RDD
val title = movies.map(f=>(f.toString().split("::")(0).replaceAll("\\[", "")trim(),f.toString().split("::")(1)))
//movieID,MovieName
println(title.toDebugString)
val view = ratings.map(f=>(f.toString().split("::")(1).trim(),1)).reduceByKey(_+_).sortBy(f=>f._2, false).take(10).toSeq
val viewRDD = sc.parallelize(view)
val join = title.join(viewRDD).map(f=>(f._2._1,f._2._2))
val dataRdd = join.map(row=>(row._1,row._2))
Я пытаюсь сохранить dataRdd в формате avro, для которого я использую метод saveAsNewAPIHadoopFile
.Ниже описано, как я сохраняю СДР:
dataRdd.saveAsNewAPIHadoopFile("E:\\ml-1m\\ml-1m\\movieAvro2",classOf[AvroKey[_]], classOf[AvroValue[_]], classOf[AvroKeyValueOutputFormat[_, _]], sc.hadoopConfiguration)
Когда я запускаю программу, я получаю сообщение об ошибке ниже:
java.lang.IllegalStateException: Writer schema for output key was not set. Use AvroJob.setOutputKeySchema().
at org.apache.avro.hadoop.io.AvroDatumConverterFactory.create(AvroDatumConverterFactory.java:94)
at org.apache.avro.mapreduce.AvroKeyValueOutputFormat.getRecordWriter(AvroKeyValueOutputFormat.java:55)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
18/10/18 11:27:46 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5, localhost, executor driver): java.lang.IllegalStateException: Writer schema for output key was not set. Use AvroJob.setOutputKeySchema().
at org.apache.avro.hadoop.io.AvroDatumConverterFactory.create(AvroDatumConverterFactory.java:94)
at org.apache.avro.mapreduce.AvroKeyValueOutputFormat.getRecordWriter(AvroKeyValueOutputFormat.java:55)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)