NullPointerException при преобразовании сопоставленных данных в DataFrame - PullRequest
0 голосов
/ 29 мая 2019

Я пишу искровое приложение, которое берет данные транзакций из Hive и соединяет их с данными о местоположении из таблицы HBase. По сути, конечная цель - определить, где произошла транзакция, путем объединения lat и long из таблицы HBase в данные транзакции из Hive. Однако я получаю исключение NullPointerException при преобразовании объединенного набора данных в DataFrame.

Исключение появляется, когда я использую следующее:
.toDF ()
.createDataFrame ()
.parallize (.toSeq)

Сначала я подумал, что некоторые столбцы имеют нулевое значение, поэтому я использую Option (). ToString, чтобы убедиться, что нет нулевых значений, но ошибка все равно появляется, когда я вызываю вышеупомянутые 3 метода.

Я также могу подтвердить, что placeholder_Iterator.toStream не является нулевым, так как мне удается распечатать данные.

Я должен использовать foreachPartition, потому что getATMLocation () подключается к таблице hbase для получения lat и log. Ошибка сериализации произойдет, если я не использую foreachPartition. Ниже приведен код функции:

def getATMLocation(colFamily: String, search_item: String, table: Table) = {
    val scanner = new Scan()
    scanner
      .addColumn(colFamily.getBytes(), atm_dict_key.getBytes())
      .addColumn(colFamily.getBytes(), atm_dict_lat.getBytes())
      .addColumn(colFamily.getBytes(), atm_dict_long.getBytes())

    val filter = new SingleColumnValueFilter(colFamily.getBytes, atm_dict_key.getBytes(), CompareOp.EQUAL, Option(search_item).getOrElse("").toString.getBytes())
    scanner.setFilter(filter)

    val atm_locations = table.getScanner(scanner)

    val location = atm_locations.next()

    val longitude = location match {
      case null => null
      case _ => Option(Bytes.toString(location.getValue(colFamily.getBytes(), atm_dict_long.getBytes()))).getOrElse("")
    }

    val latitude = location match {
      case null => null
      case _ => Option(Bytes.toString(location.getValue(colFamily.getBytes(), atm_dict_lat.getBytes()))).getOrElse("")
    }

    atm_locations.close()

    (longitude, latitude)
  }

Ниже приведен проблемный код для вашей справки:

val max_records = sql(hive_query_1 + " " + period_clause.replace("|date|", "01-11-2018")).select("transac_count").as[String].collect()(0).toInt
      val max_page = math.ceil(max_records.toDouble/page_limit.toDouble).toInt

      val start_row = 0
      val end_row = page_limit.toInt

      if(max_records > 0) {
        for (page <- 0 to max_page - 1) {

          val hiveDF = sql("SELECT " + hive_columns + " FROM (" + (hive_query_2 + " " + period_clause.replace("|date|", "01-11-2018")
            ) + ") as trans_data WHERE rowid BETWEEN " + (start_row + (page * page_limit.toInt)).toString + " AND " + ((end_row + (page * page_limit.toInt)) - 1).toString)
            .withColumn("uuid", timeUUID())
            .withColumn("created_dt", current_timestamp())

          hiveDF.show()

          hiveDF.rdd.foreachPartition{ iter =>
            val hbaseconfig = HBaseConfiguration.create()
            hbaseconfig.set("keytab.file", keytab)
            val hbase_connection = ConnectionFactory.createConnection(hbaseconfig)
            val table = hbase_connection.getTable(TableName.valueOf(hbase_table))
            val regionLoc = hbase_connection.getRegionLocator(table.getName)
            val admin = hbase_connection.getAdmin

            val atm_dict_table = hbase_connection.getTable(TableName.valueOf(atm_dict_tbl))

            val placeholder_Iterator = iter.map(r => {
              val location = Query.getATMLocation(atm_dict_col_family, Option(r.get(14)).getOrElse("").toString, atm_dict_table)
              (Option(r.get(0)).toString, Option(r.get(1)).toString, Option(r.get(2)).toString, Option(r.get(3)).toString, Option(r.get(4)).toString, Option(r.get(5)).toString, Option(r.get(6)).toString, Option(r.get(7)).toString, Option(r.get(8)).toString, Option(r.get(9)).toString, Option(r.get(10)).toString, Option(r.get(11)).toString, Option(r.get(12)).toString, Option(r.get(13)).toString, Option(r.get(14)).toString, Option(r.get(15)).toString,  Option(r.get(16)).toString , Option(location._1).toString, Option(location._2).toString)
            })

            val test = placeholder_Iterator.toStream.toDF(new_column_names: _*)
            test.foreach(x => println(x))
          }
        }
      }

Ниже приведена ошибка, которая возвращается:

java.lang.NullPointerException
    at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:228)
    at TransactionData$$anonfun$main$2$$anonfun$apply$1$$anonfun$apply$mcVI$sp$1.apply(TransactionData.scala:109)
    at TransactionData$$anonfun$main$2$$anonfun$apply$1$$anonfun$apply$mcVI$sp$1.apply(TransactionData.scala:94)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Я действительно надеюсь, что объединенные данные можно преобразовать в DataFrame, чтобы я мог записать их в HFile и массово вставить их в HBase

1 Ответ

0 голосов
/ 03 июня 2019

Я нашел ответ. Причина исключения нулевого указателя заключается в том, что кадры данных, rdd или наборы данных могут существовать только в драйвере. Этот пост объясняет это.

Spark: как создать локальный фрейм данных у каждого исполнителя

...