фрейм данных pyspark со столбцом json для агрегирования элементов json в новый столбец и удаления повторяющихся - PullRequest
1 голос
/ 02 августа 2020
• 1000 и агрегированный по году и месяцу
  year month p_id (string)
  2010 09    ["vfdvtbe", "cdscs", "usdvwq", "ujhbe", "yjev"]
  2007 10    ["ukerge", "ikrtw", "ikwca", "unvwq", "cqwcq"]

новый столбец «p_id» представляет собой строку массива. Я хотел бы подсчитать, какие разные "p_id" и сколько их в каждый год и месяц. А также удалите повторяющиеся элементы, которые появляются в том же году и месяце.

Мой код:

from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = ArrayType(StructType(
[
   StructField('p_id', StringType(), True)
]
))

schema = ArrayType(MapType(StringType(),StringType()))

t = ff.withColumn("data",F.explode(F.from_json(F.col("json_col"),schema))).withColumn("data",F.when(F.col("data")["product_id"].cast("string").isNotNull(),F.col("data")["product_id"])).filter(F.col("data").isNotNull()).drop("json_col")


display(t)

Я не уверен, что это может удалить дубликаты?

спасибо

1 Ответ

1 голос
/ 03 августа 2020

Используйте flatten, array_distinct с функциями groupBy, collect_list для этого случая.

Example:

df.show(10,False)
#+----+-----+---------------------------------------------------------+
#|year|month|json_col                                                 |
#+----+-----+---------------------------------------------------------+
#|2010|09   |[{"p_id":"vfdvtbe"}, {"p_id":"cdscs"}, {"p_id":"usdvwq"}]|
#|2010|09   |[{"p_id":"ujhbe"}, {"p_id":"cdscs"}, {"p_id":"yjev"}]    |
#|2007|10   |[{"p_id":"ukerge"}, {"p_id":"ikrtw"}, {"p_id":"ikwca"}]  |
#|2007|10   |[{"p_id":"unvwq"}, {"p_id":"cqwcq"}, {"p_id":"ikwca"}]   |
#+----+-----+---------------------------------------------------------+

from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = ArrayType(StructType(
[
   StructField('p_id', StringType(), True)
]
))


df1=df.withColumn("ff",from_json(col("json_col"),schema)).\
select("year","month",expr('transform(ff , f -> f.p_id)').alias("tmp"))

df1.groupBy("year","month").\
agg(to_json(array_distinct(flatten(collect_list(col("tmp"))))).alias("p_id")).\
show(10,False)
#+----+-----+-------------------------------------------+
#|year|month|p_id                                       |
#+----+-----+-------------------------------------------+
#|2010|09   |["vfdvtbe","cdscs","usdvwq","ujhbe","yjev"]|
#|2007|10   |["ukerge","ikrtw","ikwca","unvwq","cqwcq"] |
#+----+-----+-------------------------------------------+
...