Я пытаюсь прочитать данные JSON и явно указываю схему для данных.Но я получаю исключение при попытке выбрать столбец в кадре данных.
У меня есть следующие данные JSON:
{"col1":{"value":"test"},"col2":{"nested_col":{"nested_col2":{"value1":"12345","value2":"xyz"}}}}
{"col1":{"value":"test"},"col3":{"nested_col3_1":{"nested_col3_2":{"value3_1":"12345","value3_2":"xyz"}}}}
{"col1":{"value":"test"},"col2":{"nested_col":{"nested_col2":{"value1":"12345","value2":"xyz"}},"col3":{"nested_col3_1":{"nested_col3_2":{"value3_1":"12345","value3_2":"xyz"}}}}}
Я пытаюсь прочитать данные с помощью Spark.Схема выглядит следующим образом:
SparkSession spark = SparkSession.builder()
.appName("Java Spark SQL data source JSON example")
.master("local[2]").getOrCreate();
Dataset<Row> ds1 = spark.read().format("json").load("test.json");
List<StructField> fieldList = new ArrayList<StructField>();
List<StructField> col1List = new ArrayList<StructField>();
List<StructField> col2List = new ArrayList<StructField>();
List<StructField> nested_colList = new ArrayList<StructField>();
List<StructField> nested_col2List = new ArrayList<StructField>();
nested_col2List.add(DataTypes.createStructField("value1", DataTypes.StringType, true));
nested_col2List.add(DataTypes.createStructField("value2", DataTypes.StringType, true));
nested_colList.add(DataTypes.createStructField("nested_col2", DataTypes.createStructType(nested_col2List), true));
col2List.add(DataTypes.createStructField("nested_col", DataTypes.createStructType(nested_colList), true));
col1List.add(DataTypes.createStructField("value", DataTypes.StringType, true));
fieldList.add(DataTypes.createStructField("col1", DataTypes.createStructType(col1List), true));
fieldList.add(DataTypes.createStructField("col2", DataTypes.createStructType(col2List), true));
StructType schema = DataTypes.createStructType(fieldList);
Затем я пытаюсь выбрать столбец, используя следующий код:
Dataset<Row> df = spark.createDataFrame(ds1.javaRDD(), schema);
df.select(col("col1.value")).show();
Но я получаю следующее исключение:
Caused by: java.lang.RuntimeException: Error while encoding: java.lang.ArrayIndexOutOfBoundsException: 1
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else named_struct(value, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, col1), StructField(value,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, col1), StructField(value,StringType,true)), 0, value), StringType), true, false)) AS col1#10
if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else named_struct(nested_col, if (validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, col2), StructField(nested_col,StructType(StructField(nested_col2,StructType(StructField(value1,StringType,true), StructField(value2,StringType,true)),true)),true)).isNullAt) null else named_struct(nested_col2, if (validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, col2), StructField(nested_col,StructType(StructField(nested_col2,StructType(StructField(value1,StringType,true), StructField(value2,StringType,true)),true)),true)), 0, nested_col), StructField(nested_col2,StructType(StructField(value1,StringType,true), StructField(value2,StringType,true)),true)).isNullAt) null else named_struct(value1, if (validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, col2), StructField(nested_col,StructType(StructField(nested_col2,StructType(StructField(value1,StringType,true), StructField(value2,StringType,true)),true)),true)), 0, nested_col), StructField(nested_col2,StructType(StructField(value1,StringType,true), StructField(value2,StringType,true)),true)), 0, nested_col2), StructField(value1,StringType,true), StructField(value2,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, col2), StructField(nested_col,StructType(StructField(nested_col2,StructType(StructField(value1,StringType,true), StructField(value2,StringType,true)),true)),true)), 0, nested_col), StructField(nested_col2,StructType(StructField(value1,StringType,true), StructField(value2,StringType,true)),true)), 0, nested_col2), StructField(value1,StringType,true), StructField(value2,StringType,true)), 0, value1), StringType), true, false), value2, if (validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, col2), StructField(nested_col,StructType(StructField(nested_col2,StructType(StructField(value1,StringType,true), StructField(value2,StringType,true)),true)),true)), 0, nested_col), StructField(nested_col2,StructType(StructField(value1,StringType,true), StructField(value2,StringType,true)),true)), 0, nested_col2), StructField(value1,StringType,true), StructField(value2,StringType,true)).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, col2), StructField(nested_col,StructType(StructField(nested_col2,StructType(StructField(value1,StringType,true), StructField(value2,StringType,true)),true)),true)), 0, nested_col), StructField(nested_col2,StructType(StructField(value1,StringType,true), StructField(value2,StringType,true)),true)), 0, nested_col2), StructField(value1,StringType,true), StructField(value2,StringType,true)), 1, value2), StringType), true, false)))) AS col2#11
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:291)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
at org.apache.spark.sql.SparkSession$$anonfun$4.apply(SparkSession.scala:593)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:363)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:363)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.spark.sql.catalyst.expressions.GenericRow.get(rows.scala:174)
at org.apache.spark.sql.Row$class.isNullAt(Row.scala:191)
at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:166)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.If_1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:288)
... 23 more
18/11/30 01:24:36 INFO SparkContext: Invoking stop() from shutdown hook
Как я могу решить эту проблему?
ОБНОВЛЕНИЕ
Данные не структурированы, как показано выше