Создание одного фрейма данных из двух фреймов данных в виде отдельных подстолбцов в pyspark - PullRequest
0 голосов
/ 06 августа 2020

Я хочу поместить два фрейма данных в один, чтобы каждый из них был вложенным столбцом, а не объединением фреймов данных. Итак, у меня есть два фрейма данных, stat1_df и stat2_df, и они выглядят примерно так:

root
 |-- max_scenes: integer (nullable = true)
 |-- median_scenes: double (nullable = false)
 |-- avg_scenes: double (nullable = true)

+----------+-------------+------------------+
|max_scenes|median_scenes|avg_scenes        |
+----------+-------------+------------------+
|97        |7.0          |10.806451612903226|
|97        |7.0          |10.806451612903226|
|97        |7.0          |10.806451612903226|
|97        |7.0          |10.806451612903226|
+----------+-------------+------------------+


root
 |-- max: double (nullable = true)
 |-- type: string (nullable = true)

+-----+-----------+
|max  |type       |
+-----+-----------+
|10.0 |small      |
|25.0 |medium     |
|50.0 |large      |
|250.0|extra_large|
+-----+-----------+

, и я хочу, чтобы result_df был как:

root
 |-- some_statistics1: struct (nullable = true)
 |    |-- max_scenes: integer (nullable = true)
      |-- median_scenes: double (nullable = false)
      |-- avg_scenes: double (nullable = true)
 |-- some_statistics2: struct (nullable = true)
 |    |-- max: double (nullable = true)
      |-- type: string (nullable = true)

Есть ли способ поставить эти двое, как показано? stat1_df и stat2_df - простые фреймы данных, без массивов и вложенных столбцов. Окончательный фрейм данных записывается в mongodb. Если возникнут дополнительные вопросы, я здесь, чтобы ответить.

1 Ответ

1 голос
/ 07 августа 2020

Проверьте код ниже.

Добавьте id столбец в оба DataFrame, переместите все столбцы в структуру, а затем используйте join оба DataFrame

scala> val dfa = Seq(("10","8.9","7.9")).toDF("max_scenes","median_scenes","avg_scenes")
dfa: org.apache.spark.sql.DataFrame = [max_scenes: string, median_scenes: string ... 1 more field]

scala> dfa.show(false)
+----------+-------------+----------+
|max_scenes|median_scenes|avg_scenes|
+----------+-------------+----------+
|10        |8.9          |7.9       |
+----------+-------------+----------+


scala> dfa.printSchema
root
 |-- max_scenes: string (nullable = true)
 |-- median_scenes: string (nullable = true)
 |-- avg_scenes: string (nullable = true)


scala> val mdfa = dfa.select(struct($"*").as("some_statistics1")).withColumn("id",monotonically_increasing_id)
mdfa: org.apache.spark.sql.DataFrame = [some_statistics1: struct<max_scenes: string, median_scenes: string ... 1 more field>, id: bigint]

scala> mdfa.printSchema
root
 |-- some_statistics1: struct (nullable = false)
 |    |-- max_scenes: string (nullable = true)
 |    |-- median_scenes: string (nullable = true)
 |    |-- avg_scenes: string (nullable = true)
 |-- id: long (nullable = false)


scala> mdfa.show(false)
+----------------+---+
|some_statistics1|id |
+----------------+---+
|[10,8.9,7.9]    |0  |
+----------------+---+


scala> val dfb = Seq(("11.2","sample")).toDF("max","type")
dfb: org.apache.spark.sql.DataFrame = [max: string, type: string]

scala> dfb.printSchema
root
 |-- max: string (nullable = true)
 |-- type: string (nullable = true)


scala> dfb.show(false)
+----+------+
|max |type  |
+----+------+
|11.2|sample|
+----+------+


scala> val mdfb = dfb.select(struct($"*").as("some_statistics2")).withColumn("id",monotonically_increasing_id)
mdfb: org.apache.spark.sql.DataFrame = [some_statistics2: struct<max: string, type: string>, id: bigint]

scala> mdfb.printSchema
root
 |-- some_statistics2: struct (nullable = false)
 |    |-- max: string (nullable = true)
 |    |-- type: string (nullable = true)
 |-- id: long (nullable = false)


scala> mdfb.show(false)
+----------------+---+
|some_statistics2|id |
+----------------+---+
|[11.2,sample]   |0  |
+----------------+---+


scala> mdfa.join(mdfb,Seq("id"),"inner").drop("id").printSchema
root
 |-- some_statistics1: struct (nullable = false)
 |    |-- max_scenes: string (nullable = true)
 |    |-- median_scenes: string (nullable = true)
 |    |-- avg_scenes: string (nullable = true)
 |-- some_statistics2: struct (nullable = false)
 |    |-- max: string (nullable = true)
 |    |-- type: string (nullable = true)


scala> mdfa.join(mdfb,Seq("id"),"inner").drop("id").show(false)
+----------------+----------------+
|some_statistics1|some_statistics2|
+----------------+----------------+
|[10,8.9,7.9]    |[11.2,sample]   |
+----------------+----------------+
...