Пример значений в Аггреагте - PullRequest
0 голосов
/ 01 мая 2018

У меня есть набор данных, в котором мне нужно получить некоторые данные, а также наиболее частые значения.

Пример кадра данных ниже.

from pyspark.sql import Row, functions as F    
row = Row("UK_1","UK_2","Request_Date",'Approval_Date',"Cat",'Country','State' )
test_df = (sc.parallelize
([
    row(1,1,'12/10/2016','10-10-2017',"A",'USA','NY'),
    row(1,2,None,'10-1-2016','A','CAN','QC'),
    row(2,1,'14/10/2016',None,'B','USA','FL'),
    row('A',3,'!~2016/2/276','Bad Date','B','USA',None),
    row(None,1,'26/09/2016','26/11/08','A',None,'ON'),
    row(1,1,'12/10/2016','22-02-20','A',None,None),
    row(1,2,None,'45/45/00','A','MEX','XPZ'),
    row(2,1,'14/10/2016','None','B','DEU','DUS'),
    row(None,None,'!~2016/2/276','12-01-2015','B','',''),
    row(None,1,'26/09/2016',None,'A','USA','CA')
]).toDF())
test_df.show()

enter image description here

У меня есть пример кода, но он не полный.

(
    test_df
    .agg
    (
        F.count('*').alias('count'),
        F.countDistinct('Country').alias('distinct_country')
        #.alias('top_2_countries')
    )
    .show()
)

Ожидаемые результаты, как показано ниже.

enter image description here

Как это сделать.

1 Ответ

0 голосов
/ 01 мая 2018

Значения null в вашем DataFrame вызывают проблемы для агрегирования. Один из вариантов - заменить эти значения чем-то ненулевым для агрегирования.

Например:

new_df = test_df.withColumn(
    "Country",
    F.when(
        F.isnull("Country"),
        "None"
    ).otherwise(F.col("Country"))
)

Возвращает DataFrame, где значения null столбца Country были заменены строкой "None". (Я намеренно избегал использования строки "null", чтобы избежать двусмысленности.)

Теперь вы можете получить количество и ранжировать каждую из стран по их частотам, используя pyspark.sql.functions.rank() и pyspark.sql.Window.

from pyspark.sql import Window
new_df.groupBy("Country")\
    .agg(
        F.count("Country").alias("Count"),
        F.rank().over(Window.orderBy(F.count("Country").desc())).alias("Rank")
    )\
    .show()
#+-------+-----+----+
#|Country|Count|Rank|
#+-------+-----+----+
#|    USA|    4|   1|
#|   None|    2|   2|
#|    MEX|    1|   3|
#|       |    1|   3|
#|    DEU|    1|   3|
#|    CAN|    1|   3|
#+-------+-----+----+

Как видите, из-за замены в столбце графства отображается "None". На данный момент у вас есть все, что вам нужно для вычисления желаемых агрегатов.

  • Первый выходной столбец (count) - это просто сумма столбца Count.
  • Второй выходной столбец (distinct_country) вычисляется аналогично тому, как вы делали это в своем посте.
  • Окончательный выходной столбец (top_2_countries) можно вычислить с помощью pyspark.sql.functions.collect_list(), отфильтровав значения, где rank <= 2.

Например:

new_df.groupBy("Country")\
    .agg(
        F.count("Country").alias("Count"),
        F.rank().over(Window.orderBy(F.count("Country").desc())).alias("Rank")
    )\
    .agg(
        F.sum("Count").alias("count"),
        F.countDistinct("Country").alias("distinct_country"),
        F.collect_list(F.when(F.col("rank")<=2, F.col("Country"))).alias("top_2_countries")
    )\
    .show()
#+-----+----------------+---------------+
#|count|distinct_country|top_2_countries|
#+-----+----------------+---------------+
#|   10|               6|    [USA, None]|
#+-----+----------------+---------------+

Обратите внимание на две вещи здесь. Во-первых, количество равно 6, а не 5 в вашем примере. 5 - результат игнорирования null в countDistinct(). Аналогично, столбец top_2_countries имеет значения [USA, None].

В целях демонстрации, вот что произошло бы, если бы вы преобразовали значения "None" обратно в null:

new_df.groupBy("Country")\
    .agg(
        F.count("Country").alias("Count"),
        F.rank().over(Window.orderBy(F.count("Country").desc())).alias("Rank")
    )\
    .withColumn(
        "Country",
        F.when(F.col("Country") == "None", None).otherwise(F.col("Country"))
    )\
    .agg(
        F.sum("Count").alias("count"),
        F.countDistinct("Country").alias("distinct_country"),
        F.collect_list(F.when(F.col("rank")<=2, F.col("Country"))).alias("top_2_countries")
    )\
    .show()
#+-----+----------------+---------------+
#|count|distinct_country|top_2_countries|
#+-----+----------------+---------------+
#|   10|               5|          [USA]|
#+-----+----------------+---------------+

Как видите, отличное число равно 5, но столбец top_2_countries не содержит null. Это связано с тем, что null исключено из collect_list() 1 . (См. этот пример ).

1 Важно отметить, что я использовал этот факт при вызове collect_list(). Из документов на pyspark.sql.functions.when():

Если Column.otherwise () не вызывается, для несоответствующих условий возвращается None.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...