Spark: как выполнять операции агрегирования в массиве строк в кадре данных - PullRequest
0 голосов
/ 14 июля 2020

Я хочу выполнить групповые столбцы некоторых операций агрегирования, таких как count, count_distinct или nunique.
Например,

# the samples values in `date` column are all unique
df.show(7)
+--------------------+---------------------------------+-------------------+---------+
|            category|                             tags|           datetime|     date|
+--------------------+---------------------------------+-------------------+---------+
|                null|      ,industry,display,Merchants|2018-01-08 14:30:32| 20200704|
|        social,smart|    smart,swallow,game,Experience|2019-06-17 04:34:51| 20200705|
|      ,beauty,social|            social,picture,social|2017-08-19 09:01:37| 20200706|
|             default|        default,game,us,adventure|2019-10-02 14:18:56| 20200707|
|financial management|financial management,loan,product|2018-07-17 02:07:39| 20200708|
|              system|  system,font,application,setting|2015-07-18 00:45:57| 20200709|
|                null|     ,system,profile,optimization|2018-09-07 19:59:03| 20200710|

df.printSchema()
root
 |-- category: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- date: string (nullable = true)
# I want to do some group aggregations by PySpark like follows in pandas
group_date_tags_cnt_df = df.groupby('date')['tags'].count()
group_date_tags_nunique_df = df.groupby('date')['tags'].nunique()

group_date_category_cnt_df = df.groupby('date')['category'].count()
group_date_category_nunique_df = df.groupby('date')['category'].nunique()

# expected output here
# AND all results should ignore ',' in the splitted result and `null` value in aggregations operations
group_date_tags_cnt_df.show(4)
+---------+---------+
|     date|    count|
+---------+---------+
| 20200704|        3|
| 20200705|        4|
| 20200706|        3|
| 20200707|        4|

group_date_tags_nunique_df.show(4)
+---------+---------------------------------+
|     date|              count(DISTINCT tag)|
+---------+---------------------------------+
| 20200704|                                3|
| 20200705|                                4|
| 20200706|                                3|
| 20200707|                                4|

# It should ignore `null` here
group_date_category_cnt_df.show(4)
+---------+---------+
|     date|    count|
+---------+---------+
| 20200704|        0|
| 20200705|        2|
| 20200706|        2|
| 20200707|        1|

group_date_category_nunique_df.show(4)
+---------+----------------------------+
|     date|    count(DISTINCT category)|
+---------+----------------------------+
| 20200704|                           1|
| 20200705|                           2|
| 20200706|                           2|
| 20200707|                           1|

Но tags и category столбцы имеют тип string здесь.
Поэтому я думаю, что мне следует сначала сделать split способ и выполнить групповые операции агрегирования на основе.
Но мне немного неудобно реализовать это.
Так что мог бы кто-нибудь мне поможет?

Ответы [ 2 ]

1 голос
/ 14 июля 2020

Код Pyspark, который решает вашу проблему, я взял данные 3 дат 20200702, 20200704, 20200705

from pyspark.sql import Row
from pyspark.sql.functions import *

drow = Row("category","tags","datetime","date")

data = [drow("", ",industry,display,Merchants","2018-01-08 14:30:32","20200704"),drow("social,smart","smart,swallow,game,Experience","2019-06-17 04:34:51","20200702"),drow(",beauty,social", "social,picture,social", "2017-08-19 09:01:37", "20200705")]
df = spark.createDataFrame(data)

final_df=df.withColumn("category", split(df['category'], ",")).withColumn("tags", split(df['tags'], ",")).select('datetime', 'date', 'tags', explode(when(col("category").isNotNull(), col("category")).otherwise(array(lit("").cast("string")))).alias("category")).select('datetime', 'date', 'category', explode(when(col("tags").isNotNull(), col("tags")).otherwise(array(lit("").cast("string")))).alias("tags")).alias("tags")

final_df.show()
'''
+-------------------+--------+--------+----------+
|           datetime|    date|category|      tags|
+-------------------+--------+--------+----------+
|2018-01-08 14:30:32|20200704|        |          |
|2018-01-08 14:30:32|20200704|        |  industry|
|2018-01-08 14:30:32|20200704|        |   display|
|2018-01-08 14:30:32|20200704|        | Merchants|
|2019-06-17 04:34:51|20200702|  social|     smart|
|2019-06-17 04:34:51|20200702|  social|   swallow|
|2019-06-17 04:34:51|20200702|  social|      game|
|2019-06-17 04:34:51|20200702|  social|Experience|
|2019-06-17 04:34:51|20200702|   smart|     smart|
|2019-06-17 04:34:51|20200702|   smart|   swallow|
|2019-06-17 04:34:51|20200702|   smart|      game|
|2019-06-17 04:34:51|20200702|   smart|Experience|
|2017-08-19 09:01:37|20200705|        |    social|
|2017-08-19 09:01:37|20200705|        |   picture|
|2017-08-19 09:01:37|20200705|        |    social|
|2017-08-19 09:01:37|20200705|  beauty|    social|
|2017-08-19 09:01:37|20200705|  beauty|   picture|
|2017-08-19 09:01:37|20200705|  beauty|    social|
|2017-08-19 09:01:37|20200705|  social|    social|
|2017-08-19 09:01:37|20200705|  social|   picture|
+-------------------+--------+--------+----------+
only showing top 20 rows'''


final_df.groupBy('date','tags').count().show()
'''
+--------+----------+-----+
|    date|      tags|count|
+--------+----------+-----+
|20200702|     smart|    2|
|20200705|   picture|    3|
|20200702|   swallow|    2|
|20200704|  industry|    1|
|20200704|   display|    1|
|20200702|      game|    2|
|20200704|          |    1|
|20200704| Merchants|    1|
|20200702|Experience|    2|
|20200705|    social|    6|
+--------+----------+-----+
'''

final_df.groupBy('date','category').count().show()
'''
+--------+--------+-----+
|    date|category|count|
+--------+--------+-----+
|20200702|   smart|    4|
|20200702|  social|    4|
|20200705|        |    3|
|20200705|  beauty|    3|
|20200704|        |    4|
|20200705|  social|    3|
+--------+--------+-----+
'''
1 голос
/ 14 июля 2020
  case class d(
              category: Option[String],
              tags: String,
              datetime: String,
              date: String
              )

  val sourceDF = Seq(
    d(None, ",industry,display,Merchants", "2018-01-08 14:30:32", "20200704"),
    d(Some("social,smart"), "smart,swallow,game,Experience", "2019-06-17 04:34:51", "20200704"),
    d(Some(",beauty,social"), "social,picture,social", "2017-08-19 09:01:37", "20200704")
  ).toDF("category", "tags", "datetime", "date")


  val df1 = sourceDF.withColumn("category", split('category, ","))
    .withColumn("tags", split('tags, ","))


  val df2 = df1.select('datetime, 'date, 'tags,
    explode(
      when(col("category").isNotNull, col("category"))
        .otherwise(array(lit(null).cast("string")))).alias("category")
  )

  val df3 = df2.select('category, 'datetime, 'date,
    explode(
      when(col("tags").isNotNull, col("tags"))
        .otherwise(array(lit(null).cast("string")))).alias("tags")
  )

  val resDF = df3.select('category, 'tags, 'datetime, 'date)

  resDF.show
//  +--------+----------+-------------------+--------+
//  |category|      tags|           datetime|    date|
//  +--------+----------+-------------------+--------+
//  |    null|          |2018-01-08 14:30:32|20200704|
//  |    null|  industry|2018-01-08 14:30:32|20200704|
//  |    null|   display|2018-01-08 14:30:32|20200704|
//  |    null| Merchants|2018-01-08 14:30:32|20200704|
//  |  social|     smart|2019-06-17 04:34:51|20200704|
//  |  social|   swallow|2019-06-17 04:34:51|20200704|
//  |  social|      game|2019-06-17 04:34:51|20200704|
//  |  social|Experience|2019-06-17 04:34:51|20200704|
//  |   smart|     smart|2019-06-17 04:34:51|20200704|
//  |   smart|   swallow|2019-06-17 04:34:51|20200704|
//  |   smart|      game|2019-06-17 04:34:51|20200704|
//  |   smart|Experience|2019-06-17 04:34:51|20200704|
//  |        |    social|2017-08-19 09:01:37|20200704|
//  |        |   picture|2017-08-19 09:01:37|20200704|
//  |        |    social|2017-08-19 09:01:37|20200704|
//  |  beauty|    social|2017-08-19 09:01:37|20200704|
//  |  beauty|   picture|2017-08-19 09:01:37|20200704|
//  |  beauty|    social|2017-08-19 09:01:37|20200704|
//  |  social|    social|2017-08-19 09:01:37|20200704|
//  |  social|   picture|2017-08-19 09:01:37|20200704|
//  +--------+----------+-------------------+--------+




  val group1DF = resDF.groupBy('date, 'category).count()
  group1DF.show
//  +--------+--------+-----+
//  |    date|category|count|
//  +--------+--------+-----+
//  |20200704|  social|    7|
//  |20200704|        |    3|
//  |20200704|   smart|    4|
//  |20200704|  beauty|    3|
//  |20200704|    null|    4|
//  +--------+--------+-----+

  val group2DF = resDF.groupBy('datetime, 'category).count()
  group2DF.show
//  +-------------------+--------+-----+
//  |           datetime|category|count|
//  +-------------------+--------+-----+
//  |2017-08-19 09:01:37|  social|    3|
//  |2017-08-19 09:01:37|  beauty|    3|
//  |2019-06-17 04:34:51|   smart|    4|
//  |2019-06-17 04:34:51|  social|    4|
//  |2018-01-08 14:30:32|    null|    4|
//  |2017-08-19 09:01:37|        |    3|
//  +-------------------+--------+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...