чтение из hdfs, запись в kafka с использованием scala spark, но получение NullPointerException - PullRequest
0 голосов
/ 17 октября 2019

Я хочу прочитать текстовый файл из hdf, используя spark rdd, и записать в kafka по foreach.Code следующим образом

  def main(args: Array[String]): Unit = {

    val kafkaItemProducerConfig = {
      val p = new Properties()
      p.put("bootstrap.servers", KAFKA_SERVER)
      p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      p.put("client.id", KAFKA_ITEM_CLIENT_ID)
      p.put("acks", "all")
      p
    }
    val conf: SparkConf = new SparkConf().setAppName("esc")
    val sc: SparkContext = new SparkContext(conf)
    kafkaItemProducer = new KafkaProducer[String, String](kafkaItemProducerConfig)
    if (kafkaItemProducer == null) {
      println("kafka config is error")
      sys.exit(1)
    }
    val dataToDmp = sc.textFile("/home/********/" + args(0) +"/part*")
    dataToDmp.foreach(x => {
      if (x != null && !x.isEmpty) {
        kafkaItemProducer.send(new ProducerRecord[String, String](KAFKA_TOPIC_ITEM, x.toString))
      }
    }
    )
    kafkaItemProducer.close()
  }

Я совершенно уверен, что KAFKA_SERVER и KAFKA_ITEM_CLIENT_ID и KAFKA_TOPIC_ITEM верны. Но этополучено сообщение об ошибке:

 ERROR ApplicationMaster [Driver]: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 12 in stage 0.0 failed 4 times, most recent failure: Lost task 12.3 in stage 0.0 (TID 18, tjtx148-5-173.58os.org, executor 1): java.lang.NullPointerException
    at esc.HdfsWriteToKafks$$anonfun$main$1.apply(HdfsWriteToKafks.scala:56)
    at esc.HdfsWriteToKafks$$anonfun$main$1.apply(HdfsWriteToKafks.scala:53)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:88)
    at org.apache.spark.scheduler.Task.run(Task.scala:100)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:341)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Сказано:

    at esc.HdfsWriteToKafks$$anonfun$main$1.apply(HdfsWriteToKafks.scala:56)
    at esc.HdfsWriteToKafks$$anonfun$main$1.apply(HdfsWriteToKafks.scala:53)

Итак, в 56 строке есть ошибка

kafkaItemProducer.send(new ProducerRecord[String, String](KAFKA_TOPIC_ITEM, x.toString))

53 в строке

dataToDmp.foreach(

Я проверил содержимое dataToDmp, я использовал dataToDmp.take(100).foreach(println(_)) для проверки, и это правильно.

Есть ли какая-то ошибка в моем коде?

1 Ответ

0 голосов
/ 18 октября 2019

Это сработало. Я изменил, что я использовал метод foreachpartition вместо foreach. В каждом разделе я создавал продюсера. Код, как показано ниже:

  def main(args: Array[String]): Unit = {


    var kafkaItemProducer: KafkaProducer[String, String] = null

    val KAFKA_ITEM_CLIENT_ID = "a"
    val KAFKA_TOPIC_ITEM = "b"
    val KAFKA_SERVER = "host:port"

    val kafkaItemProducerConfig = {
      val p = new Properties()
      p.put("bootstrap.servers", KAFKA_SERVER)
      p.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      p.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      p.put("client.id", KAFKA_ITEM_CLIENT_ID)
      p.put("acks", "all")
      p
    }
    val conf: SparkConf = new SparkConf().setAppName("esc")
    val sc: SparkContext = new SparkContext(conf)



    val dataToDmp = sc.textFile("/home/****/" + args(0) +"/part*",5)
    dataToDmp.foreachPartition(partition =>{
      val kafkaItemProducer = new KafkaProducer[String, String](kafkaItemProducerConfig)
      partition.foreach(
        x=>{ kafkaItemProducer.send(new ProducerRecord[String, String](KAFKA_TOPIC_ITEM, x.toString))
          Thread.sleep(100)
        }
      )
      kafkaItemProducer.close()
    })

  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...