Невозможно сбросить столбец, который я использовал для взрыва массива - PullRequest
1 голос
/ 07 мая 2020

В качестве предисловия я использую pyspark==2.4.5.

При чтении из файла json, который можно найти здесь: https://filebin.net/53rhhigep2zpqdga, мне нужно взорвать данные , Мне также не нужно data после взрыва, и мне тоже не нужно statistics.

spark= SparkSession.builder.master('local[2]').appName("createDataframe")\
        .getOrCreate()
json_data = spark.read.option('multiline', True).json(file_name)
json_data = json_data.withColumn("data_values", F.explode_outer("data"))\
        .drop("data", "statistics")

Ниже вы увидите схему и верхние 5 строк json_data

root
 |-- data_values: struct (nullable = true)
 |    |-- date: string (nullable = true)
 |    |-- events: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- active: long (nullable = true)
 |    |    |    |-- index: long (nullable = true)
 |    |    |    |-- mode: long (nullable = true)
 |    |    |    |-- rate: long (nullable = true)
 |    |    |    |-- timestamp: string (nullable = true)

+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|data_values                                                                                                                                               |
+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|[2019-02-20, [[0, 0, 1, 0, 2019-02-20T00:00:00], [0, 1, 1, 0, 2019-02-20T00:01:00], [0, 2, 1, 0, 2019-02-20T00:02:00]]]                                   |
|[2019-02-21, [[1, 0, 1, 0, 2019-02-21T00:03:00], [0, 1, 1, 0, 2019-02-21T00:04:00], [1, 2, 1, 1, 2019-02-21T00:05:00], [1, 3, 1, 1, 2019-02-21T00:06:00]]]|
|[2019-02-22, [[1, 0, 1, 0, 2019-02-22T00:03:00], [0, 1, 1, 0, 2019-02-22T00:04:00], [1, 2, 1, 1, 2019-02-22T00:05:00], [1, 3, 1, 1, 2019-02-22T00:06:00]]]|
|[2019-02-23, [[1, 3, 1, 1, 2019-02-23T00:16:00]]]                                                                                                         |
|[2019-02-24, [[1, 0, 1, 1, 2019-02-24T00:03:00], [1, 1, 1, 0, 2019-02-24T00:04:00]]]                                                                      |
+----------------------------------------------------------------------------------------------------------------------------------------------------------+

Теперь, чтобы получить нужные мне данные, я выполняю следующие запросы.

newData = json_data\
   .withColumn("events", F.explode(json_data.data_values.events))\
   .withColumn("date", json_data.data_values.date)
newData.printSchema()
newData.show(3)
finalData = newData.drop("data_values")
finalData.show(6)

Выше вы видите, что я создаю столбец с именем data_values, который взрывает мои входящие json данные. Затем я создаю столбцы для извлечения событий и даты из data_values. Ниже вы увидите, как выглядит схема, а также верхние 5 строк.

root
 |-- data_values: struct (nullable = true)
 |    |-- date: string (nullable = true)
 |    |-- events: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- active: long (nullable = true)
 |    |    |    |-- index: long (nullable = true)
 |    |    |    |-- mode: long (nullable = true)
 |    |    |    |-- rate: long (nullable = true)
 |    |    |    |-- timestamp: string (nullable = true)
 |-- events: struct (nullable = true)
 |    |-- active: long (nullable = true)
 |    |-- index: long (nullable = true)
 |    |-- mode: long (nullable = true)
 |    |-- rate: long (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |-- date: string (nullable = true)

+----------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------+----------+
|data_values                                                                                                                                               |events                           |date      |
+----------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------+----------+
|[2019-02-20, [[0, 0, 1, 0, 2019-02-20T00:00:00], [0, 1, 1, 0, 2019-02-20T00:01:00], [0, 2, 1, 0, 2019-02-20T00:02:00]]]                                   |[0, 0, 1, 0, 2019-02-20T00:00:00]|2019-02-20|
|[2019-02-20, [[0, 0, 1, 0, 2019-02-20T00:00:00], [0, 1, 1, 0, 2019-02-20T00:01:00], [0, 2, 1, 0, 2019-02-20T00:02:00]]]                                   |[0, 1, 1, 0, 2019-02-20T00:01:00]|2019-02-20|
|[2019-02-20, [[0, 0, 1, 0, 2019-02-20T00:00:00], [0, 1, 1, 0, 2019-02-20T00:01:00], [0, 2, 1, 0, 2019-02-20T00:02:00]]]                                   |[0, 2, 1, 0, 2019-02-20T00:02:00]|2019-02-20|
|[2019-02-21, [[1, 0, 1, 0, 2019-02-21T00:03:00], [0, 1, 1, 0, 2019-02-21T00:04:00], [1, 2, 1, 1, 2019-02-21T00:05:00], [1, 3, 1, 1, 2019-02-21T00:06:00]]]|[1, 0, 1, 0, 2019-02-21T00:03:00]|2019-02-21|
|[2019-02-21, [[1, 0, 1, 0, 2019-02-21T00:03:00], [0, 1, 1, 0, 2019-02-21T00:04:00], [1, 2, 1, 1, 2019-02-21T00:05:00], [1, 3, 1, 1, 2019-02-21T00:06:00]]]|[0, 1, 1, 0, 2019-02-21T00:04:00]|2019-02-21|
+----------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------+----------+

Когда у меня есть желаемый фрейм данных, я пытаюсь отбросить data_values, но получаю эту ошибку:

py4j.protocol.Py4JJavaError: An error occurred while calling o58.showString.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: _gen_alias_25#25
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:75)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:74)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:286)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:291)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:291)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:74)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$.$anonfun$bindReferences$1(BoundAttribute.scala:96)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike.map(TraversableLike.scala:238)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
        at scala.collection.immutable.List.map(List.scala:298)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReferences(BoundAttribute.scala:96)
        at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:63)
        at org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:193)
        at org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:148)
        at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:495)
        at org.apache.spark.sql.execution.InputRDDCodegen.doProduce(WholeStageCodegenExec.scala:482)
        at org.apache.spark.sql.execution.InputRDDCodegen.doProduce$(WholeStageCodegenExec.scala:455)
        at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:495)
        at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:94)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:211)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:208)
        at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:89)
        at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:89)
        at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:495)
        at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:49)
        at org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:94)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:211)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:208)
        at org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:89)
        at org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:89)
        at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:39)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:629)
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:689)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:173)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:211)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:208)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:169)
        at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:161)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3482)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2581)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3472)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3468)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2581)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2788)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:297)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:334)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: Couldn't find _gen_alias_25#25 in [data_values#5]
        at scala.sys.package$.error(package.scala:30)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.$anonfun$applyOrElse$1(BoundAttribute.scala:81)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)

Схема newData имеет поля, которые мне нужны, но выполнение newData.show(3) приводит к ошибке, указанной выше.

root
 |-- events: struct (nullable = true)
 |    |-- active: long (nullable = true)
 |    |-- rate: long (nullable = true)
 |    |-- index: long (nullable = true)
 |    |-- mode: long (nullable = true)
 |    |-- timestamp: string (nullable = true)
 |-- date: string (nullable = true)

Я попытался создать новый фрейм данных после удаления data_values, а затем запустил .show (), но у меня все еще возникает та же проблема. Я предполагаю, что по какой-то причине новые столбцы, которые я создал, все еще ссылаются на data_values, так что, возможно, я создаю эти столбцы неправильно?

Я пытался найти в Интернете людей, у которых возникла такая же проблема, но, похоже, это не обычная проблема. Поскольку информации об ошибке gen_alias не так много.

1 Ответ

1 голос
/ 07 мая 2020

Здесь я использую Spark 2.4.3

точка 1: путь обновления

>>> from pyspark.sql import functions as F
>>> json_data = spark.read.option('multiline', True).json("/home/maheshpersonal/stack.json")
>>> json_data.show(truncate=False)
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|data                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[[2019-02-20, [[0, 0, 1, 0, 2019-02-20T00:00:00], [0, 1, 1, 0, 2019-02-20T00:01:00], [0, 2, 1, 0, 2019-02-20T00:02:00]]], [2019-02-21, [[1, 0, 1, 0, 2019-02-21T00:03:00], [0, 1, 1, 0, 2019-02-21T00:04:00], [1, 2, 1, 1, 2019-02-21T00:05:00], [1, 3, 1, 1, 2019-02-21T00:06:00]]], [2019-02-22, [[1, 0, 1, 0, 2019-02-22T00:03:00], [0, 1, 1, 0, 2019-02-22T00:04:00], [1, 2, 1, 1, 2019-02-22T00:05:00], [1, 3, 1, 1, 2019-02-22T00:06:00]]], [2019-02-23, [[1, 3, 1, 1, 2019-02-23T00:16:00]]], [2019-02-24, [[1, 0, 1, 1, 2019-02-24T00:03:00], [1, 1, 1, 0, 2019-02-24T00:04:00]]]]|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Точка: 2 проверьте схему

>>> json_data.printSchema()
root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: string (nullable = true)
 |    |    |-- events: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- active: long (nullable = true)
 |    |    |    |    |-- index: long (nullable = true)
 |    |    |    |    |-- mode: long (nullable = true)
 |    |    |    |    |-- rate: long (nullable = true)
 |    |    |    |    |-- timestamp: string (nullable = true)

Point3: разделение столбцов данных

>>> json_data_1 = json_data.withColumn("data_values", F.explode_outer("data"))
>>> json_data_1.printSchema ()
root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: string (nullable = true)
 |    |    |-- events: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- active: long (nullable = true)
 |    |    |    |    |-- index: long (nullable = true)
 |    |    |    |    |-- mode: long (nullable = true)
 |    |    |    |    |-- rate: long (nullable = true)
 |    |    |    |    |-- timestamp: string (nullable = true)
 |-- data_values: struct (nullable = true)
 |    |-- date: string (nullable = true)
 |    |-- events: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- active: long (nullable = true)
 |    |    |    |-- index: long (nullable = true)
 |    |    |    |-- mode: long (nullable = true)
 |    |    |    |-- rate: long (nullable = true)
 |    |    |    |-- timestamp: string (nullable = true)

Point4: выбор столбца в соответствии с требованиями

 >>> newData = json_data_1.withColumn("events", json_data_1.data_values.events).withColumn("date", json_data_1.data_values.date)

 >>> newData.show()
    +--------------------+--------------------+--------------------+----------+
    |                data|         data_values|              events|      date|
    +--------------------+--------------------+--------------------+----------+
    |[[2019-02-20, [[0...|[2019-02-20, [[0,...|[[0, 0, 1, 0, 201...|2019-02-20|
    |[[2019-02-20, [[0...|[2019-02-21, [[1,...|[[1, 0, 1, 0, 201...|2019-02-21|
    |[[2019-02-20, [[0...|[2019-02-22, [[1,...|[[1, 0, 1, 0, 201...|2019-02-22|
    |[[2019-02-20, [[0...|[2019-02-23, [[1,...|[[1, 3, 1, 1, 201...|2019-02-23|
    |[[2019-02-20, [[0...|[2019-02-24, [[1,...|[[1, 0, 1, 1, 201...|2019-02-24|
    +--------------------+--------------------+--------------------+----------+

point5: удаление столбца данных из фрейма данных

>>> newData_v1 = newData.drop(newData.data)
>>> newData_v1.show()
+--------------------+--------------------+----------+
|         data_values|              events|      date|
+--------------------+--------------------+----------+
|[2019-02-20, [[0,...|[[0, 0, 1, 0, 201...|2019-02-20|
|[2019-02-21, [[1,...|[[1, 0, 1, 0, 201...|2019-02-21|
|[2019-02-22, [[1,...|[[1, 0, 1, 0, 201...|2019-02-22|
|[2019-02-23, [[1,...|[[1, 3, 1, 1, 201...|2019-02-23|
|[2019-02-24, [[1,...|[[1, 0, 1, 1, 201...|2019-02-24|
+--------------------+--------------------+----------+

Point6: удаление столбца data_values ​​из newData_v1

>>> finalDataframe = newData_v1.drop(newData_v1.data_values)
>>> finalDataframe.show(truncate = False)
+--------------------------------------------------------------------------------------------------------------------------------------------+----------+
|events                                                                                                                                      |date      |
+--------------------------------------------------------------------------------------------------------------------------------------------+----------+
|[[0, 0, 1, 0, 2019-02-20T00:00:00], [0, 1, 1, 0, 2019-02-20T00:01:00], [0, 2, 1, 0, 2019-02-20T00:02:00]]                                   |2019-02-20|
|[[1, 0, 1, 0, 2019-02-21T00:03:00], [0, 1, 1, 0, 2019-02-21T00:04:00], [1, 2, 1, 1, 2019-02-21T00:05:00], [1, 3, 1, 1, 2019-02-21T00:06:00]]|2019-02-21|
|[[1, 0, 1, 0, 2019-02-22T00:03:00], [0, 1, 1, 0, 2019-02-22T00:04:00], [1, 2, 1, 1, 2019-02-22T00:05:00], [1, 3, 1, 1, 2019-02-22T00:06:00]]|2019-02-22|
|[[1, 3, 1, 1, 2019-02-23T00:16:00]]                                                                                                         |2019-02-23|
|[[1, 0, 1, 1, 2019-02-24T00:03:00], [1, 1, 1, 0, 2019-02-24T00:04:00]]                                                                      |2019-02-24|
+--------------------------------------------------------------------------------------------------------------------------------------------+----------+

Уроки из этого всегда использовать новый фрейм данных для хранения преобразования. проверьте, помогает ли это вам :)

...