искра sql запрос двухфазного NPE - PullRequest
0 голосов
/ 10 апреля 2020

Я новичок в Spark (используйте 2.4.0). Я столкнулся со странным (для меня) исключением NPE. Следующий код возвращает NPE.

val ds = "2020-04-01"
spark.sql("select ds, db_name, table_name, type FROM datainfra.hive_tables " +
  "where ds = '%s' and db_name = 'db_exports' limit 1".format(ds)).map(table =>
  spark.sql("select col_name FROM datainfra.hive_columns " +
    "where ds = '%s' and db_name = '%s' and table_name = '%s' and table_type = '%s' and col_type = 'string'"
      .format(table.getAs[String]("ds"),
        table.getAs[String]("db_name"),
        table.getAs[String]("table_name"),
        table.getAs[String]("type")))
    .map(columnNameRow => columnNameRow.getAs[String](0)).collect().mkString("||")
)

Но отдельно каждый из DF работает нормально:

spark.sql("select ds, db_name, table_name, type FROM datainfra.hive_tables " +
  "where ds = '%s' and db_name = 'db_exports' limit 1".format(ds)).show // returns results

spark.sql("select col_name FROM datainfra.hive_columns " +
  ("where ds = '%s' and db_name = '%s' and table_name = '%s' and table_type = '%s' and col_type = 'string' " +
    "and col_name != 'ds'")
    .format(ds,
      "hardcode_db_name",
      "hardcode_table_name",
      "hardcode_type")).map(columnNameRow => columnNameRow.getAs[String](0)).collect().mkString("||")

Как это может быть?

1 Ответ

1 голос
/ 12 апреля 2020

В: Я новичок в Spark (используйте 2.4.0). Я столкнулся со странным (для меня) исключением NPE. Следующий код возврата NPE. Как это может быть?


spark. sql ("sql"). Map. (Spark. sql ("some sql")) шаблон имеет проблему.

В вашем случае причина исключения нулевого указателя

    val ds = "2020-04-01"

       val test1: Dataset[String] =  spark.sql("select ds, db_name, table_name, type FROM datainfra.hive_tables " +
          "where ds = '%s' and db_name = 'db_exports' limit 1".format(ds))
         .map(table =>
          spark.sql("select col_name FROM datainfra.hive_columns " +
            "where ds = '%s' and db_name = '%s' and table_name = '%s' and table_type = '%s' and col_type = 'string'"
              .format(table.getAs[String]("ds"),
                table.getAs[String]("db_name"),
                table.getAs[String]("table_name"),
                table.getAs[String]("type")))
            .map(columnNameRow => columnNameRow.getAs[String](0)).collect().mkString("||")
        )

Чтобы доказать это, я подготовил аналогичный пример, пожалуйста, см. Ниже .. Я повторил то же исключение нулевого указателя, похоже, что оно не поддерживается.

package com.examples

import org.apache.log4j.Level
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * Created by Ram Ghadiyaram
  */
object RDDOfTupleExample {
  org.apache.log4j.Logger.getLogger("org").setLevel(Level.ERROR)

  def main(args: Array[String]) {

    val spark = SparkSession.builder.
      master("local")
      .appName(this.getClass.getName)
      .getOrCreate()
    import spark.implicits._

    val donuts: DataFrame = Seq(("plain donut", 1.50), ("plain donut", 1.50)
      , ("vanilla donut", 2.0), ("vanilla donut", 2.0)
      , ("glazed donut", 2.50))
      .toDF("Donut_Name", "Price")

    //lets suppose this is your hive table since i dont have hive i simulated with temp table
    donuts.createOrReplaceTempView("mydonuts")
    //    }
    val test: Dataset[String] = spark.sql("select \"NCA-15\" as mylabel, count(Donut_Name) as mydonutcount from mydonuts")
      .map(x => spark.sql(s"select ${x.get(0)}, ${x.get(1)} ").collect().mkString(",")) // this is problem
    test.show

  }
}

Результат:

[2020-04-11 16:27:45,687] ERROR Exception in task 0.0 in stage 1.0 (TID 1) (org.apache.spark.executor.Executor:91)
java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at com.examples.RDDOfTupleExample$$anonfun$1.apply(RDDOfTupleExample.scala:29)
    at com.examples.RDDOfTupleExample$$anonfun$1.apply(RDDOfTupleExample.scala:29)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.mapelements_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.deserializetoobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2020-04-11 16:27:45,710] ERROR Task 0 in stage 1.0 failed 1 times; aborting job (org.apache.spark.scheduler.TaskSetManager:70)
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost, executor driver): java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at com.examples.RDDOfTupleExample$$anonfun$1.apply(RDDOfTupleExample.scala:29)
    at com.examples.RDDOfTupleExample$$anonfun$1.apply(RDDOfTupleExample.scala:29)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.mapelements_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.deserializetoobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
    at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:751)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:710)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:719)
    at com.examples.RDDOfTupleExample$.main(RDDOfTupleExample.scala:30)
    at com.examples.RDDOfTupleExample.main(RDDOfTupleExample.scala)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:143)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:141)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at com.examples.RDDOfTupleExample$$anonfun$1.apply(RDDOfTupleExample.scala:29)
    at com.examples.RDDOfTupleExample$$anonfun$1.apply(RDDOfTupleExample.scala:29)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.mapelements_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.deserializetoobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)



Вывод: вышеупомянутая вложенная искра. sql шаблон не работает. Вы должны выполнить отдельно или использовать другой способ

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...