Я хочу прочитать текстовый файл из hdf, используя spark rdd, и записать в kafka по foreach.Code следующим образом
def main(args: Array[String]): Unit = {
val kafkaItemProducerConfig = {
val p = new Properties()
p.put("bootstrap.servers", KAFKA_SERVER)
p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
p.put("client.id", KAFKA_ITEM_CLIENT_ID)
p.put("acks", "all")
p
}
val conf: SparkConf = new SparkConf().setAppName("esc")
val sc: SparkContext = new SparkContext(conf)
kafkaItemProducer = new KafkaProducer[String, String](kafkaItemProducerConfig)
if (kafkaItemProducer == null) {
println("kafka config is error")
sys.exit(1)
}
val dataToDmp = sc.textFile("/home/********/" + args(0) +"/part*")
dataToDmp.foreach(x => {
if (x != null && !x.isEmpty) {
kafkaItemProducer.send(new ProducerRecord[String, String](KAFKA_TOPIC_ITEM, x.toString))
}
}
)
kafkaItemProducer.close()
}
Я совершенно уверен, что KAFKA_SERVER и KAFKA_ITEM_CLIENT_ID и KAFKA_TOPIC_ITEM верны. Но этополучено сообщение об ошибке:
ERROR ApplicationMaster [Driver]: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 0.0 failed 4 times, most recent failure: Lost task 12.3 in stage 0.0 (TID 18, tjtx148-5-173.58os.org, executor 1): java.lang.NullPointerException
at esc.HdfsWriteToKafks$$anonfun$main$1.apply(HdfsWriteToKafks.scala:56)
at esc.HdfsWriteToKafks$$anonfun$main$1.apply(HdfsWriteToKafks.scala:53)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
at org.apache.spark.scheduler.Task.run(Task.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Сказано:
at esc.HdfsWriteToKafks$$anonfun$main$1.apply(HdfsWriteToKafks.scala:56)
at esc.HdfsWriteToKafks$$anonfun$main$1.apply(HdfsWriteToKafks.scala:53)
Итак, в 56 строке есть ошибка
kafkaItemProducer.send(new ProducerRecord[String, String](KAFKA_TOPIC_ITEM, x.toString))
53 в строке
dataToDmp.foreach(
Я проверил содержимое dataToDmp, я использовал dataToDmp.take(100).foreach(println(_))
для проверки, и это правильно.
Есть ли какая-то ошибка в моем коде?