Я очень новичок в Apache spark и пытаюсь объединить внутреннюю таблицу Book с таблицей, которая имеет book_id
вместе с read_count. Цель состоит в том, чтобы сгенерировать таблицу названий книг и соответствующий счетчик чтения.
Для начала у меня есть таблица booksRead
, которая содержит запись о пользователях, читающих книги, сгруппированных по book_id
в порядке их частоты чтения.
booksReadDF.groupBy($"book_id").agg(count("book_id").as("read_count"))
.orderBy($"read_count".desc)
.show()
+-------+----------+
|book_id|read_count|
+-------+----------+
| 8611| 565|
| 14| 436|
| 11850| 394|
| 15| 357|
| 11803| 324|
+-------+----------+
only showing top 5 rows
и я пытаюсь присоединиться к внутренней таблице books
, которая выглядит так:
+------+--------------------+--------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+---------+----+--------------+------------+--------------------+--------------------+--------+--------+-----------------+---------------+----------------+--------+----------+
| id| created_at| updated_at| title| isbn_13| isbn_10| image_url| description| publisher|author_id|year|overall_rating|audible_link| google_url| query_title|category|language|number_of_reviews|waterstone_link|amazon_available|is_ebook|page_count|
+------+--------------------+--------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+---------+----+--------------+------------+--------------------+--------------------+--------+--------+-----------------+---------------+----------------+--------+----------+
|115442|2018-07-25 00:59:...|2018-07-25 00:59:...|Representation of...|9781361479278|1361479272|http://books.goog...|This dissertation...|Open Dissertation...| 62130|2017| null| null|http://books.goog...|representation of...| | en| 0| null| true| false| null|
|115450|2018-07-25 00:59:...|2018-07-25 00:59:...|Imag(in)ing the W...|9789004182981|9004182985|http://books.goog...|This study examin...| BRILL| 73131|2010| null| null|http://books.goog...|imagining the war...| | en| 0| null| true| false| null|
|218332|2018-08-19 14:48:...|2018-08-19 14:48:...|My Life With Tibe...|9781462802357|1462802354|http://books.goog...|Your child is a m...| Xlibris Corporation| 118091|2008| null| null|https://play.goog...|my life with tibe...| | en| 0| null| true| false| null|
|186991|2018-08-11 11:08:...|2018-08-11 11:08:...| NOT "Just Friends"|9781416586401|1416586407|http://books.goog...|One of the world’...| Simon and Schuster| 7687|2007| null| null|https://play.goog...| not just friends| | en| 0| null| true| false| null|
|247317|2018-09-06 08:23:...|2018-09-06 08:23:...|OCR AS and A Leve...|9781910523056|1910523054|https://images-eu...|A complete course...| PG Online Limited| 128220|2016| null| null| null|ocr as and a leve...| null| English| null| null| true| false| null|
+------+--------------------+--------------------+--------------------+-------------+----------+--------------------+--------------------+--------------------+---------+----+--------------+------------+--------------------+--------------------+--------+--------+-----------------+---------------+----------------+--------+----------+
only showing top 5 rows
, где book_id
соединяется с id
в таблице книг с помощью этой команды:
booksReadDF.groupBy($"book_id").agg(count("book_id").as("read_count"))
.orderBy($"read_count".desc)
.join(booksDF, booksReadDF.col("book_id") === booksDF.col("id"), "inner")
.show()
но я получаю эту ошибку:
Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(book_id#4, 200)
+- *(3) Sort [read_count#67L DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(read_count#67L DESC NULLS LAST, 200)
+- *(2) HashAggregate(keys=[book_id#4], functions=[count(book_id#4)], output=[book_id#4, read_count#67L])
+- Exchange hashpartitioning(book_id#4, 200)
+- *(1) HashAggregate(keys=[book_id#4], functions=[partial_count(book_id#4)], output=[book_id#4, count#215L])
+- *(1) Scan JDBCRelation(books_readbook) [numPartitions=1] [book_id#4] PushedFilters: [*IsNotNull(book_id)], ReadSchema: struct<book_id:int>
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.doExecute(WholeStageCodegenExec.scala:383)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.joins.SortMergeJoinExec.inputRDDs(SortMergeJoinExec.scala:386)
at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at org.apache.spark.sql.Dataset.show(Dataset.scala:745)
at org.apache.spark.sql.Dataset.show(Dataset.scala:704)
at org.apache.spark.sql.Dataset.show(Dataset.scala:713)
at DBConn$.main(DBConn.scala:36)
at DBConn.main(DBConn.scala)
Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange rangepartitioning(read_count#67L DESC NULLS LAST, 200)
+- *(2) HashAggregate(keys=[book_id#4], functions=[count(book_id#4)], output=[book_id#4, read_count#67L])
+- Exchange hashpartitioning(book_id#4, 200)
+- *(1) HashAggregate(keys=[book_id#4], functions=[partial_count(book_id#4)], output=[book_id#4, count#215L])
+- *(1) Scan JDBCRelation(books_readbook) [numPartitions=1] [book_id#4] PushedFilters: [*IsNotNull(book_id)], ReadSchema: struct<book_id:int>
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 52 more
Caused by: java.lang.IllegalArgumentException: Unsupported class file major version 56
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:166)
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:148)
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:136)
at org.apache.xbean.asm6.ClassReader.<init>(ClassReader.java:237)
at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:49)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:517)
at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.scala:500)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:134)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:134)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:500)
at org.apache.xbean.asm6.ClassReader.readCode(ClassReader.java:2175)
at org.apache.xbean.asm6.ClassReader.readMethod(ClassReader.java:1238)
at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:631)
at org.apache.xbean.asm6.ClassReader.accept(ClassReader.java:355)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:307)
at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(ClosureCleaner.scala:306)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:306)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2100)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:309)
at org.apache.spark.RangePartitioner.<init>(Partitioner.scala:171)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:224)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 72 more
19/06/17 16:21:47 INFO SparkContext: Invoking stop() from shutdown hook
19/06/17 16:21:47 INFO SparkUI: Stopped Spark web UI at http://10.245.65.12:4040
19/06/17 16:21:47 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/06/17 16:21:47 INFO MemoryStore: MemoryStore cleared
19/06/17 16:21:47 INFO BlockManager: BlockManager stopped
19/06/17 16:21:48 INFO BlockManagerMaster: BlockManagerMaster stopped
19/06/17 16:21:48 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/06/17 16:21:48 INFO SparkContext: Successfully stopped SparkContext
19/06/17 16:21:48 INFO ShutdownHookManager: Shutdown hook called
19/06/17 16:21:48 INFO ShutdownHookManager: Deleting directory /private/var/folders/ql/dpk0v2gs15z83pvwt_g3n7lh0000gn/T/spark-9368b8cb-0cf6-45a5-9548-a9c1975dab46