Исключение в потоке "main" org.apache.spark.sql.AnalysisException: не удается разрешить '`temp1`' для заданных входных столбцов: [testdata.value];строка 1 поз 7; - PullRequest
0 голосов
/ 05 декабря 2018

Я пытаюсь проанализировать данные из кафки со структурой потоковой искры.Код выглядит следующим образом:

SparkSession spark = SparkSession.builder()
            .appName("Simple Application")
            .master("local[*]")
            .getOrCreate();
Dataset<Row> df = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "10.135.80.34:61276")
            .option("subscribe", "test")
            .option("startingOffsets", "earliest")
            .load();
Dataset<Row> jsonDf = df.select(col("value").cast("string")).as("data");
jsonDf.select("data.*").createOrReplaceTempView("testData");
Dataset<Row> queryData = spark.sql("select temp from testData");
StreamingQuery query = queryData.writeStream()
            .format("console")
            .start();
query.awaitTermination();

Данные в kafka похожи на:

{"temp":15, "hum":20, "env":{"lux":100, "geo":22}}

Но я получил исключение:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`temp`' given input columns: [testdata.value]; line 1 pos 7;
'Project ['temp]
+- SubqueryAlias `testdata`
   +- Project [value#21]
      +- SubqueryAlias `data`
         +- Project [cast(value#8 as string) AS value#21]
            +- StreamingRelationV2 
org.apache.spark.sql.kafka010.KafkaSourceProvider@59845579, kafka, Map(startingOffsets -> earliest, subscribe -> iot_test2, kafka.bootstrap.servers -> 10.135.80.34:61276,10.135.80.34:61277,10.135.80.34:61278), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@71664560,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> iot_test2, kafka.bootstrap.servers -> 10.135.80.34:61276,10.135.80.34:61277,10.135.80.34:61278),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:110)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$$nestedInanonfun$checkAnalysis$1$2.applyOrElse(CheckAnalysis.scala:107)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$2(TreeNode.scala:278)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:278)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUp$1(QueryPlan.scala:93)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:105)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:105)
at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:116)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:121)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.immutable.List.foreach(List.scala:388)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.immutable.List.map(List.scala:294)
at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:121)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:126)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:126)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:93)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:107)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1$adapted(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:85)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:82)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:95)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:108)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:79)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
at com.sparktest.iot.testmain.main(testmain.java:64)

Я также пытался прочитатьданные со StructType:

StructType sensorSchema = new StructType().add("temp", "integer")
            .add("hum", "integer")
            .add("env", new StructType()
                    .add("lux", "integer")
                    .add("geo", "integer"));
Dataset<Row> jsonDf = df.select(from_json(col("value").cast("string"), sensorSchema)).as("data");

Но я получил похожее исключение:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`temp`' given input columns: [testdata.jsontostructs(CAST(value AS STRING))]; line 1 pos 7;

Может ли кто-нибудь помочь мне с этим, пожалуйста?Большое спасибо.

...