Как решить эту ошибку org.apache.spark.sql.catalyst.errors.package $ TreeNodeException - PullRequest
0 голосов
/ 26 октября 2018

У меня есть два процесса каждый процесс 1) подключить oracle db прочитать конкретную таблицу 2) сформировать датафрейм и обработать его. 3) сохранить df к Кассандре.

Если я запускаю оба процесса параллельно, оба пытаются прочитать из oracle и я получаю ошибку ниже, пока второй процесс читает данные

 ERROR ValsProcessor2: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#290L])
   +- *(1) Scan JDBCRelation((SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T) [numPartitions=2] [] PushedFilters: [], ReadSchema: struct<>
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
at com.snp.processors.BenchmarkModelValsProcessor2.process(BenchmarkModelValsProcessor2.scala:43)
at com.snp.utils.Utils$$anonfun$getAllDefinedProcessors$2.apply(Utils.scala:28)
at com.snp.utils.Utils$$anonfun$getAllDefinedProcessors$2.apply(Utils.scala:28)
at com.sp.MigrationDriver$$anonfun$main$2$$anonfun$apply$1.apply(MigrationDriver.scala:78)
at com.sp.MigrationDriver$$anonfun$main$2$$anonfun$apply$1.apply(MigrationDriver.scala:78)
at scala.Option.map(Option.scala:146)
at com.sp.MigrationDriver$$anonfun$main$2.apply(MigrationDriver.scala:75)
at com.sp.MigrationDriver$$anonfun$main$2.apply(MigrationDriver.scala:74)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
at com.sp.MigrationDriver$.main(MigrationDriver.scala:74)
at com.sp.MigrationDriver.main(MigrationDriver.scala)
Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.needToCopyObjectsBeforeShuffle(ShuffleExchangeExec.scala:163)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:300)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 37 more

Что я здесь не так делаю? Как это исправить?

Ответы [ 2 ]

0 голосов
/ 30 октября 2018

Я закрывал sparkSession в блоке finally в первом процессоре / вызываемом классе. Я переместил его из процессора и поместил в вызывающий класс, который решил проблему.

0 голосов
/ 28 октября 2018

Я не уверен в истинной причине, единственное, что бросилось в глаза, это следующее выражение SQL: (SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T - что означает T здесь?

Что касается общего дизайна, я быРекомендую совершенно другой подход.В вашем случае у вас есть 2 процессора, которые работают с одинаковыми данными, собранными из Oracle, и каждый процессор извлекает данные отдельно.Я бы порекомендовал перенести чтение данных Oracle в отдельную процедуру, которая будет возвращать фрейм данных (вам нужно его кешировать), и тогда ваши процессоры будут работать с этим фреймом данных и сохранять данные в Cassandra.

Или каккак было рекомендовано ранее, вы можете разделить задание на 2 части - одну, которая извлекает все данные из Oracle, и сохраняет кадр данных на диск (не persist, а используя write), например, в виде файла Parquet.А затем отдельное задание (я), которое будет принимать данные с диска и выполнять необходимые преобразования.

В обоих сценариях вы

...