В качестве предисловия я использую pyspark==2.4.5
.
При чтении из файла json, который можно найти здесь: https://filebin.net/53rhhigep2zpqdga, мне нужно взорвать данные , Мне также не нужно data
после взрыва, и мне тоже не нужно statistics
.
spark= SparkSession.builder.master('local[2]').appName("createDataframe")\
.getOrCreate()
json_data = spark.read.option('multiline', True).json(file_name)
json_data = json_data.withColumn("data_values", F.explode_outer("data"))\
.drop("data", "statistics")
Ниже вы увидите схему и верхние 5 строк json_data
root
|-- data_values: struct (nullable = true)
| |-- date: string (nullable = true)
| |-- events: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- active: long (nullable = true)
| | | |-- index: long (nullable = true)
| | | |-- mode: long (nullable = true)
| | | |-- rate: long (nullable = true)
| | | |-- timestamp: string (nullable = true)
+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|data_values |
+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|[2019-02-20, [[0, 0, 1, 0, 2019-02-20T00:00:00], [0, 1, 1, 0, 2019-02-20T00:01:00], [0, 2, 1, 0, 2019-02-20T00:02:00]]] |
|[2019-02-21, [[1, 0, 1, 0, 2019-02-21T00:03:00], [0, 1, 1, 0, 2019-02-21T00:04:00], [1, 2, 1, 1, 2019-02-21T00:05:00], [1, 3, 1, 1, 2019-02-21T00:06:00]]]|
|[2019-02-22, [[1, 0, 1, 0, 2019-02-22T00:03:00], [0, 1, 1, 0, 2019-02-22T00:04:00], [1, 2, 1, 1, 2019-02-22T00:05:00], [1, 3, 1, 1, 2019-02-22T00:06:00]]]|
|[2019-02-23, [[1, 3, 1, 1, 2019-02-23T00:16:00]]] |
|[2019-02-24, [[1, 0, 1, 1, 2019-02-24T00:03:00], [1, 1, 1, 0, 2019-02-24T00:04:00]]] |
+----------------------------------------------------------------------------------------------------------------------------------------------------------+
Теперь, чтобы получить нужные мне данные, я выполняю следующие запросы.
newData = json_data\
.withColumn("events", F.explode(json_data.data_values.events))\
.withColumn("date", json_data.data_values.date)
newData.printSchema()
newData.show(3)
finalData = newData.drop("data_values")
finalData.show(6)
Выше вы видите, что я создаю столбец с именем data_values
, который взрывает мои входящие json данные. Затем я создаю столбцы для извлечения событий и даты из data_values
. Ниже вы увидите, как выглядит схема, а также верхние 5 строк.
root
|-- data_values: struct (nullable = true)
| |-- date: string (nullable = true)
| |-- events: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- active: long (nullable = true)
| | | |-- index: long (nullable = true)
| | | |-- mode: long (nullable = true)
| | | |-- rate: long (nullable = true)
| | | |-- timestamp: string (nullable = true)
|-- events: struct (nullable = true)
| |-- active: long (nullable = true)
| |-- index: long (nullable = true)
| |-- mode: long (nullable = true)
| |-- rate: long (nullable = true)
| |-- timestamp: string (nullable = true)
|-- date: string (nullable = true)
+----------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------+----------+
|data_values |events |date |
+----------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------+----------+
|[2019-02-20, [[0, 0, 1, 0, 2019-02-20T00:00:00], [0, 1, 1, 0, 2019-02-20T00:01:00], [0, 2, 1, 0, 2019-02-20T00:02:00]]] |[0, 0, 1, 0, 2019-02-20T00:00:00]|2019-02-20|
|[2019-02-20, [[0, 0, 1, 0, 2019-02-20T00:00:00], [0, 1, 1, 0, 2019-02-20T00:01:00], [0, 2, 1, 0, 2019-02-20T00:02:00]]] |[0, 1, 1, 0, 2019-02-20T00:01:00]|2019-02-20|
|[2019-02-20, [[0, 0, 1, 0, 2019-02-20T00:00:00], [0, 1, 1, 0, 2019-02-20T00:01:00], [0, 2, 1, 0, 2019-02-20T00:02:00]]] |[0, 2, 1, 0, 2019-02-20T00:02:00]|2019-02-20|
|[2019-02-21, [[1, 0, 1, 0, 2019-02-21T00:03:00], [0, 1, 1, 0, 2019-02-21T00:04:00], [1, 2, 1, 1, 2019-02-21T00:05:00], [1, 3, 1, 1, 2019-02-21T00:06:00]]]|[1, 0, 1, 0, 2019-02-21T00:03:00]|2019-02-21|
|[2019-02-21, [[1, 0, 1, 0, 2019-02-21T00:03:00], [0, 1, 1, 0, 2019-02-21T00:04:00], [1, 2, 1, 1, 2019-02-21T00:05:00], [1, 3, 1, 1, 2019-02-21T00:06:00]]]|[0, 1, 1, 0, 2019-02-21T00:04:00]|2019-02-21|
+----------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------+----------+
Когда у меня есть желаемый фрейм данных, я пытаюсь отбросить data_values
, но получаю эту ошибку:
py4j.protocol.Py4JJavaError: An error occurred while calling o58.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: _gen_alias_25#25
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:75)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:286)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:291)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:291)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275)
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:74)
at org.apache.spark.sql.catalyst.expressions.BindReferences$.$anonfun$bindReferences$1(BoundAttribute.scala:96)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReferences(BoundAttribute.scala:96)
at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:63)
at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:193)
at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:148)
at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:495)
at org.apache.spark.sql.execution.InputRDDCodegen.doProduce(WholeStageCodegenExec.scala:482)
at org.apache.spark.sql.execution.InputRDDCodegen.doProduce$(WholeStageCodegenExec.scala:455)
at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:495)
at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:94)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:211)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:208)
at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:89)
at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:89)
at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:495)
at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:49)
at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:94)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:211)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:208)
at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:89)
at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:89)
at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:39)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:629)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:689)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:173)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:211)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:208)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:169)
at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:161)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3482)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2581)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3472)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3468)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2581)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2788)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:297)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:334)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: Couldn't find _gen_alias_25#25 in [data_values#5]
at scala.sys.package$.error(package.scala:30)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.$anonfun$applyOrElse$1(BoundAttribute.scala:81)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
Схема newData
имеет поля, которые мне нужны, но выполнение newData.show(3)
приводит к ошибке, указанной выше.
root
|-- events: struct (nullable = true)
| |-- active: long (nullable = true)
| |-- rate: long (nullable = true)
| |-- index: long (nullable = true)
| |-- mode: long (nullable = true)
| |-- timestamp: string (nullable = true)
|-- date: string (nullable = true)
Я попытался создать новый фрейм данных после удаления data_values
, а затем запустил .show (), но у меня все еще возникает та же проблема. Я предполагаю, что по какой-то причине новые столбцы, которые я создал, все еще ссылаются на data_values
, так что, возможно, я создаю эти столбцы неправильно?
Я пытался найти в Интернете людей, у которых возникла такая же проблема, но, похоже, это не обычная проблема. Поскольку информации об ошибке gen_alias
не так много.