Простое объединение Apache Spark приводит к загадочной ошибке - PullRequest
0 голосов
/ 09 октября 2018

У меня есть два набора данных, которые я могу запросить и показать () по отдельности.У одной есть 17 записей, а у другой 3.

Dataset<Row> attReader = spark
    .read()
    .format("org.apache.spark.sql.cassandra")
    .option("table", "table_1")
    .load();

Dataset<Row> surReader = spark
    .read()
    .format("org.apache.spark.sql.cassandra")
    .option("table", "table_2")
    .load();

Когда я пытаюсь присоединиться и показать их так:

    Dataset<Row> joined = attReader.join(surReader,
        attReader.col("key_field").equalTo(surReader.col("key_field")), "inner");
    joined.show();

Я уверен, что эти поля верны, так как я могу показатьданные для отдельных наборов данных и посмотреть их.Поля объединения являются строками.

Я получаю следующее исключение, которое не очень помогает:

org.apache.spark.SparkException: Exception thrown in awaitResult: 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
    at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:367)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:135)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:232)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:102)
    at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
    at org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:85)
    at org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:206)
    at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:181)
    at org.apache.spark.sql.execution.RowDataSourceScanExec.consume(DataSourceScanExec.scala:77)
    at org.apache.spark.sql.execution.RowDataSourceScanExec.doProduce(DataSourceScanExec.scala:125)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.RowDataSourceScanExec.produce(DataSourceScanExec.scala:77)
    at org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:125)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:85)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:97)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:39)
    at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:45)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:88)
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:83)
    at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:35)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:524)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:576)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:337)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3278)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2489)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2489)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2703)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:723)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:682)
    at org.apache.spark.sql.Dataset.show(Dataset.scala:691)
    at com.kilonova.CassandraStream.sparkSql(CassandraStream.java:111)
    at com.kilonova.CassandraStream.init(CassandraStream.java:174)
    at com.kilonova.Main.runStream(Main.java:20)
    at com.kilonova.Main.main(Main.java:14)
Caused by: java.lang.IllegalArgumentException
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:449)
    at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:432)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
    at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
    at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:432)
    at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:262)
    at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:261)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:261)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2073)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectIterator(SparkPlan.scala:304)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:76)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1$$anonfun$apply$1.apply(BroadcastExchangeExec.scala:73)
    at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:97)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anonfun$relationFuture$1.apply(BroadcastExchangeExec.scala:72)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:844)

Ответы [ 2 ]

0 голосов
/ 10 октября 2018

Это может произойти из-за версии JDK.Возможно, вы не используете JDK 8.

Пожалуйста, проверьте эту тему для получения дополнительной информации.

Эта тема может помочь. Исключительная Java IllegalArgumentException на org.apache.xbean.asm5.ClassReader

0 голосов
/ 09 октября 2018

Spark пытается передать одну из таблиц меньшего размера в объединении, если ее размер меньше значения, указанного в spark.sql.autoBroadcastJoinThreshold .Это свойство по умолчанию установлено в 10 МБ.

Одна из причин, по которой вы получите эту ошибку, заключается в тайм-ауте вещания.Это свойство контролируется spark.sql.broadcastTimeout и по умолчанию имеет значение 300 секунд.

Чтобы решить вашу проблему, вы можете сделать одно из следующих действий:

  1. Отключить трансляцию, установив spark.sql.autoBroadcastJoinThreshold в -1.Но это может повлиять на производительность запросов.
  2. Увеличьте значение тайм-аута, если вы хотите транслировать таблицу.

Я видел случаи, когда у Spark нет статистики таблицы, и он может пытаться транслировать большее из двух значений.таблицы в соединении.Это ухудшит производительность.Вы можете определить эту проблему, получив план запроса и выяснив, какая таблица транслируется.

dataframe.queryExecution.sparkPlan

Если вы обнаружите, что искра передает более крупную таблицу, то вам лучше рассчитать статистику для этой таблицы.оптимизация запросов.Вы можете пойти одним из следующих способов:

  1. Запустить команду анализа таблицы через Spark.Если у вас запущена служба Hive / Impala, использующая то же самое хранилище метастазов, что и Spark, вы можете запустить анализ таблицы через Hive или вычислить статистику через Impala.

  2. Вы можете кэшировать таблицу, прежде чем выполнять ее объединение.Таким образом, Spark вычисляет статистику для таблицы на лету.

Для дальнейшего ознакомления со статистикой таблицы анализа:

Дайте мне знать, помогло ли это вам.

...