Ошибка при записи данных RDD в файл avro - PullRequest
0 голосов
/ 18 октября 2018

Ниже мой RDD

 val title = movies.map(f=>(f.toString().split("::")(0).replaceAll("\\[", "")trim(),f.toString().split("::")(1)))
                                //movieID,MovieName
    println(title.toDebugString)
    val view = ratings.map(f=>(f.toString().split("::")(1).trim(),1)).reduceByKey(_+_).sortBy(f=>f._2, false).take(10).toSeq
    val viewRDD = sc.parallelize(view)
    val join = title.join(viewRDD).map(f=>(f._2._1,f._2._2))
    val dataRdd = join.map(row=>(row._1,row._2))

Я пытаюсь сохранить dataRdd в формате avro, для которого я использую метод saveAsNewAPIHadoopFile.Ниже описано, как я сохраняю СДР:

 dataRdd.saveAsNewAPIHadoopFile("E:\\ml-1m\\ml-1m\\movieAvro2",classOf[AvroKey[_]], classOf[AvroValue[_]], classOf[AvroKeyValueOutputFormat[_, _]], sc.hadoopConfiguration)

Когда я запускаю программу, я получаю сообщение об ошибке ниже:

    java.lang.IllegalStateException: Writer schema for output key was not set. Use AvroJob.setOutputKeySchema().
    at org.apache.avro.hadoop.io.AvroDatumConverterFactory.create(AvroDatumConverterFactory.java:94)
    at org.apache.avro.mapreduce.AvroKeyValueOutputFormat.getRecordWriter(AvroKeyValueOutputFormat.java:55)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
18/10/18 11:27:46 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5, localhost, executor driver): java.lang.IllegalStateException: Writer schema for output key was not set. Use AvroJob.setOutputKeySchema().
    at org.apache.avro.hadoop.io.AvroDatumConverterFactory.create(AvroDatumConverterFactory.java:94)
    at org.apache.avro.mapreduce.AvroKeyValueOutputFormat.getRecordWriter(AvroKeyValueOutputFormat.java:55)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1119)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1102)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
...