У меня есть следующий фрагмент scala, который отражает то, что я делал в spark 2.1.1:
val headers = Seq(StructField("A", StringType), StructField("B", StringType), StructField("C", StringType))
val data = Seq(Seq("A1", "B1", "C1"), Seq("A2", "B2", "C2"), Seq("A3", "B3", "C3"))
val rdd = sc.parallelize(data).map(Row.fromSeq)
sqlContext.createDataFrame(rdd, StructType(headers)).registerTempTable("TEMP_DATA")
val table = sqlContext.table("TEMP_DATA")
table
.select("A")
.filter(table("B") === "B1")
.show()
В 2.3.1 выдается следующая ошибка:
Resolved attribute(s) B#1604 missing from A#1603 in operator !Filter (B#1604
= B1).;;
!Filter (B#1604 = B1)
+- AnalysisBarrier
+- Project [A#1603]
+- SubqueryAlias temp_data
+- LogicalRDD [A#1603, B#1604, C#1605], false
org.apache.spark.sql.AnalysisException: Resolved attribute(s) B#1604 missing from A#1603 in operator !Filter (B#1604 = B1).;;
!Filter (B#1604 = B1)
+- AnalysisBarrier
+- Project [A#1603]
+- SubqueryAlias temp_data
+- LogicalRDD [A#1603, B#1604, C#1605], false
Я могу это исправить, если поменяю местами select
и filter
.Мой вопрос: почему это изменилось?Мне нужно объяснить, почему это произошло, и в идеале связать их с документацией, подтверждающей это.
Насколько я понимаю, селектор возвращает фрейм данных, который функционально содержит только столбец A
, поэтому вы не можете фильтроватьна B
.Я пытался воссоздать эту проблему в pyspark, но, похоже, она отлично работает там.
Вот трассировка стека:
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:92)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:289)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:80)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:172)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:178)
at org.apache.spark.sql.Dataset$.apply(Dataset.scala:65)
at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:3301)
at org.apache.spark.sql.Dataset.filter(Dataset.scala:1458)