Создание исключения NullPointerException при чтении из MySQL в Apache Spark со Scala - PullRequest
0 голосов
/ 04 июля 2019

Я пытаюсь прочитать данные из MySQL, но выбрасывает исключение NullPointerException.Не уверен, в чем причина.код в main.scala

object main extends App {
  val dt = args.lift(0)
  if (dt.isEmpty || !PairingbatchUtil.validatePartitionDate(dt.get)) {
    throw new Exception("Partition date is mandatory or enter valid format 'yyyy-MM-dd'")
  }
  var mailProperties:Properties = new Properties
  var templateMappingData: Map[String, Object] = Map(
    "job" -> "Load merchant count Data from hdfs to mongo",
    "jobProcessedDate" -> dt.get,
    "batch" -> "Pairing Batch")
  val startTime = System.currentTimeMillis()

  try {

      val conf = new SparkConf().setAppName("read_from_mysql") //.setMaster("local")

    conf.set("spark.sql.warehouse.dir", "/user/local/warehouse/")
    conf.set("hive.exec.dynamic.partition", "true")
    conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
    conf.set("spark.mongodb.input.uri", "mongodb://127.0.0.1/db.table_name")
    conf.set("spark.mongodb.output.uri", "mongodb://127.0.0.1/db.table_name")
    val spark = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
    val schemaName = "/user/local/warehouse/"
    val aid = "1000"
    val resultPath = "/usr/local/proad" + "/" + dt.get

    val dbDataPartitionsMap = Map("aid" -> aid, "dt" -> dt.get)
    spark.sql("set aid=" + aid)
    spark.sql("set dt=" + dt.get)

    val configs = spark.sparkContext.getConf.getAll
    configs.foreach(i => println(i))
    val registerBaseTablesMap = Map(
      "DAILY_COUNT" -> ("SELECT * FROM " + schemaName + ".table_name WHERE aid = '${aid}' and dt ='${dt}'"),
      "DAILY_COUNT_FINAL" -> ("SELECT * FROM " + schemaName + ".second_table_name WHERE aid = '${aid}' and dt ='${dt}'"))

    val parentDF = PairingbatchUtil.readDataFromHive(registerBaseTablesMap.get("DAILY_COUNT").get, spark)

    val finalMerchantAffiliateDailyCountDF = Processor.process(parentDF, dbDataPartitionsMap, spark)
  }

код в Processor.scala

object Processor {


  case class MerchantDailyCount( _id: String, date: Date, totalClicks: String, totalLinks: String, shopUrl: String, shopUUID: String, shopName: String, publisherId: String)

  def process(parentDF: DataFrame, partitionsMap: Map[String, String], spark: SparkSession): DataFrame = {
    val schemaString = "_id date total_clicks total_links shop_url shop_uuid shop_name publisher_id"
    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)
    var finalDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)

    parentDF.foreach(row => {
      if (parentDF == null || row.getAs("publisher_id") == null || StringUtils.isBlank(row.getAs("shop_uuid"))) {
      } else {
        val shopUUID = row.getAs("shop_uuid").toString
        val currentDate = row.getAs("cur_date").toString
        val date = PairingbatchUtil.parseDate(currentDate, Constants.DATE_FORMAT_YYYY_MM_DD, Constants.TAIWAN_TIMEZONE)
        val publisherId = row.getAs("publisher_id").toString
        val totalClicks = row.getAs("total_clicks").toString
        val totalLinks = row.getAs("total_links").toString
        val shopUrl = PairingbatchUtil.setShopUrlInfo(shopUUID, "com.mysql.jdbc.Driver", "user_mame", "password", s"""select shop_url, shop_name from db.table_name where shop_uuid ='$shopUUID'""", "shopUrl", spark)._1
        val id = PairingbatchUtil.isNeedToSet(spark, shopUrl, publisherId, date)
        val merchantDailyCount = MerchantDailyCount(id, date, totalClicks, totalLinks, shopUrl,shopUUID,shopName,publisherId)
        import spark.implicits._
        val merchantCountDF = Seq(merchantDailyCount).toDF()

        finalDF = finalDF.union(merchantCountDF)
      }
    })
    finalDF
  }
}

код в PairingBatchUtil.scala:

def setShopUrlInfo(shopUUID: String, driverClass: String, user: String, pass: String, query: String, url: String, sparkSession: SparkSession)={

    val merchantDetailsDF = sparkSession.read //line no 139
      .format("jdbc")
      .option("url", url)
      .option("driver", driverClass)
      .option("dbtable",  s"( $query ) t")
      .option("user",user)
      .option("password", pass)
      .load()
    if (merchantDetailsDF.count() == 0) {
      ("INVALID SHOP URL","INVALID SHOP NAME")
    }else {
      (merchantDetailsDF.select(col = "shop_url").first().getAs("shop_url"),merchantDetailsDF.select(col = "shop_name").first().getAs("shop_name"))
    }
  }

я ожидаю вывод запроса вбыть:

+--------------+---------+
|      shop_url|shop_name|
+--------------+---------+
|     parimal  |   roy   |
+--------------+---------+

, но фактический вывод:

19/07/04 14:48:50 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:117)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:115)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:549)
    at org.apache.spark.sql.SparkSession.read(SparkSession.scala:613)
    at com.rakuten.affiliate.order.pairing.batch.util.PairingbatchUtil$.setShopUrlInfo(PairingbatchUtil.scala:139)
    at com.rakuten.affiliate.order.pairing.batch.Processors.MechantAffDailyCountProcessor$$anonfun$process$1.apply(MechantAffDailyCountProcessor.scala:40)
    at com.rakuten.affiliate.order.pairing.batch.Processors.MechantAffDailyCountProcessor$$anonfun$process$1.apply(MechantAffDailyCountProcessor.scala:30)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

1 Ответ

0 голосов
/ 04 июля 2019

Вы используете Spark 2.1?В этом случае я думаю, что у вас могут быть проблемы с конфигурацией, как вы можете видеть в источнике в строке 117

https://github.com/apache/spark/blob/branch-2.1/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

...