pyspark dataframe json разложение столбца - PullRequest
2 голосов
/ 04 августа 2020

Я пытаюсь разложить столбец json в фреймворке pyspark.

Это похоже на вопрос в фрейм данных pyspark с json столбцом, чтобы агрегировать элементы json в новый столбец и удалите дублированные

, но этот новый столбец json имеет более сложную структуру.

Dataframe

 year month id json_col
 2010  08   5  {"my_p": [{"like": false, "p_id": "dfvefvsd"}, {"like": true, "p_id": "dvcdc"}], "p_id": "cdscas"} 

Мне нужен новый столбец как:

year month id like  p_id
2010  8    5  false dfvefvsd
2010  8    5  true  dvcdc
2010  8    5  null  cdscas

Если есть дублированные p_id в том же году, месяце, id, удалите его.

Код, полученный по приведенной выше ссылке (кредит @Shu)

from pyspark.sql import functions as F
from pyspark.sql.types import *

t = spark.sql('select * from my_db.my_tab')

schema = ArrayType(
                StructType(
                  [
                    StructField('my_p', 
                                        StructType(
                                                  [StructField('p_id', StringType(), True),
                                                  StructField('like', BooleanType(), True)
                                                  ]
                                        ),
                               True), 
                   StructField('p_id', StringType(), True)
                  ]
                  
                )
            )

   t1 = t.withColumn('a_col', F.from_json('json_col', schema)).select('year', 'month', 'id', 'p_id', F.expr('transform(json_col, f -> f.p_id)').alias('tmp'))

   t1.groupBy("year","month", 'id', 'p_id').agg(F.to_json(F.array_distinct(F.flatten(F.collect_list(F.col("tmp"))))).alias("new_col")).show(10,False)

Но из json_col раскладывается только первый p_id.

спасибо

1 Ответ

1 голос
/ 04 августа 2020

попробуйте это

t.show()

#+----+-----+---+--------------------------------------------------------------------------------------------------+
#|year|month|id |json_col                                                                                          |
#+----+-----+---+--------------------------------------------------------------------------------------------------+
#|2010|08   |5  |{"my_p": [{"like": false, "p_id": "dfvefvsd"}, {"like": true, "p_id": "dvcdc"}], "p_id": "cdscas"}|
#+----+-----+---+--------------------------------------------------------------------------------------------------+


from pyspark.sql import functions as F
from pyspark.sql.window import Window

schema1='struct<my_p:array<struct<like:boolean,p_id:string>>,p_id:string>'

w=Window().partitionBy("p_id2").orderBy(F.lit(0))


t.withColumn("json_col", F.from_json("json_col",schema1))\
  .select("*","json_col.*").drop("json_col")\
  .withColumnRenamed("p_id","p_id2").select("*",F.expr("""inline(my_p)""")).drop("my_p")\
  .withColumn('num', F.row_number().over(w)).withColumn("p_id", F.when(F.col("num")==1, F.array("p_id2","p_id"))\
                                                                .otherwise(F.array("p_id"))).drop("num","p_id2")\
  .withColumn("p_id", F.explode("p_id")).show()

#+----+-----+---+-----+--------+
#|year|month| id| like|    p_id|
#+----+-----+---+-----+--------+
#|2010|   08|  5| true|   dvcdc|
#|2010|   08|  5|false|dfvefvsd|
#|2010|   08|  5|false|  cdscas|
#+----+-----+---+-----+--------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...