pyspark: collect_list () как сохранить список оставшихся элементов группы в строке - PullRequest
0 голосов
/ 28 августа 2018

Мой набор данных сгруппирован по двум переменным: 'customer' и 'sku' с использованием функции collect_list (), я хочу, чтобы каждая строка сохраняла переменную, сколько строк осталось в группе.

У меня есть такой вывод:

+----------+--------------------+-----------+--------------------+---+
|  customer|                 sku|auto_create|        next_creates|  n|
+----------+--------------------+-----------+--------------------+---+
|248274_ARC|J31/H01N2-D35MM2-...|          Y|           [Y, Y, Y]|  3|
|248274_ARC|J31/H01N2-D35MM2-...|          Y|           [Y, Y, Y]|  3|
|248274_ARC|J31/H01N2-D35MM2-...|          Y|           [Y, Y, Y]|  3|
|297945_ARC|  F87/012V55WH31EX10|          Y|        [Y, Y, Y, Y]|  4|
|297945_ARC|  F87/012V55WH31EX10|          Y|        [Y, Y, Y, Y]|  4|
|297945_ARC|  F87/012V55WH31EX10|          Y|        [Y, Y, Y, Y]|  4|
|297945_ARC|  F87/012V55WH31EX10|          Y|        [Y, Y, Y, Y]|  4|
|318725_ARC|          605/85524V|          N|           [N, N, N]|  3|
|318725_ARC|          605/85524V|          N|           [N, N, N]|  3|
|318725_ARC|          605/85524V|          N|           [N, N, N]|  3|
|403787_ARC|     BPC/77/9601-136|          N|  [N, N, N, Y, Y, Y]|  6|
|403787_ARC|     BPC/77/9601-136|          N|  [N, N, N, Y, Y, Y]|  6|
|403787_ARC|     BPC/77/9601-136|          N|  [N, N, N, Y, Y, Y]|  6|
|403787_ARC|     BPC/77/9601-136|          Y|  [N, N, N, Y, Y, Y]|  6|
|403787_ARC|     BPC/77/9601-136|          Y|  [N, N, N, Y, Y, Y]|  6|
|403787_ARC|     BPC/77/9601-136|          Y|  [N, N, N, Y, Y, Y]|  6|
|434238_ARC|        BB8/40300142|          Y|        [Y, Y, Y, Y]|  4|
|434238_ARC|        BB8/40300142|          Y|        [Y, Y, Y, Y]|  4|
|434238_ARC|        BB8/40300142|          Y|        [Y, Y, Y, Y]|  4|
|434238_ARC|        BB8/40300142|          Y|        [Y, Y, Y, Y]|  4|
+----------+--------------------+-----------+--------------------+---+

и я хочу этот вывод:

+----------+--------------------+-----------+--------------------+---+
|  customer|                 sku|auto_create|        next_creates|  n|
+----------+--------------------+-----------+--------------------+---+
|248274_ARC|J31/H01N2-D35MM2-...|          Y|           [Y, Y, Y]|  3|
|248274_ARC|J31/H01N2-D35MM2-...|          Y|              [Y, Y]|  3|
|248274_ARC|J31/H01N2-D35MM2-...|          Y|                 [Y]|  3|
|297945_ARC|  F87/012V55WH31EX10|          Y|        [Y, Y, Y, Y]|  4|
|297945_ARC|  F87/012V55WH31EX10|          Y|           [Y, Y, Y]|  4|
|297945_ARC|  F87/012V55WH31EX10|          Y|              [Y, Y]|  4|
|297945_ARC|  F87/012V55WH31EX10|          Y|                 [Y]|  4|
|318725_ARC|          605/85524V|          N|           [N, N, N]|  3|
|318725_ARC|          605/85524V|          N|              [N, N]|  3|
|318725_ARC|          605/85524V|          N|                 [N]|  3|
|403787_ARC|     BPC/77/9601-136|          N|  [N, N, N, Y, Y, Y]|  6|
|403787_ARC|     BPC/77/9601-136|          N|     [N, N, Y, Y, Y]|  6|
|403787_ARC|     BPC/77/9601-136|          N|        [N, Y, Y, Y]|  6|
|403787_ARC|     BPC/77/9601-136|          Y|           [Y, Y, Y]|  6|
|403787_ARC|     BPC/77/9601-136|          Y|              [Y, Y]|  6|
|403787_ARC|     BPC/77/9601-136|          Y|                 [Y]|  6|
|434238_ARC|        BB8/40300142|          Y|        [Y, Y, Y, Y]|  4|
|434238_ARC|        BB8/40300142|          Y|           [Y, Y, Y]|  4|
|434238_ARC|        BB8/40300142|          Y|              [Y, Y]|  4|
|434238_ARC|        BB8/40300142|          Y|                 [Y]|  4|
+----------+--------------------+-----------+--------------------+---+

Я использую следующий код:

w = \
Window.partitionBy('customer','sku').orderBy('customer','sku')
analysis = analysis \
    .withColumn('next_creates', collect_list('auto_create').over(w)) 

Предложение из ответов, которые выдавали ошибку при попытке присоединиться:

analysis = analysis.withColumn('rownumber',row_number().over(w).alias('rownumber'))

df1 = analysis
df2 = analysis

df1.join(df2, (df1.customer == df2.customer) & (df1.sku == df2.sku) & (df1.rownumber <= df2.rownumber)).groupBy('customer', 'sku').agg(collect_list('auto_create'))

Ответы [ 3 ]

0 голосов
/ 29 августа 2018

решение:

Я добавил столбец 'rownumber':

df = df.withColumn('rownumber',row_number().over(w).alias('rownumber'))

затем я изменил длину 'next_creates' следующим образом:

df = df.withColumn('next_creates', df.next_creates[rownumber-1:])
0 голосов
/ 30 августа 2018

, если вы заинтересованы в более искробезопасном решении: Вы можете использовать rowBetween для своего окна и иметь что-то вроде этого:

#create a test dataframe
test_df = spark.createDataFrame([
    ("318725_ARC","605/85524V","N"), ("318725_ARC","605/85524V","N"),
    ("318725_ARC","605/85524V","N"),("403787_ARC","BPC/77/9601-136","N"),
    ("403787_ARC","BPC/77/9601-136","N"),("403787_ARC","BPC/77/9601-136","N"),
    ("403787_ARC","BPC/77/9601-136","Y"),("403787_ARC","BPC/77/9601-136","Y"),
    ("403787_ARC","BPC/77/9601-136","Y")], ("customer", "sku","auto_create"))

w = Window.partitionBy('customer','sku').orderBy('customer','sku').rowsBetween(0,Window.unboundedFollowing)
analysis = test_df.withColumn('next_creates',collect_list('auto_create').over(w)) 

analysis.show()

и вывод:

+----------+---------------+-----------+------------------+
|  customer|            sku|auto_create|      next_creates|
+----------+---------------+-----------+------------------+
|318725_ARC|     605/85524V|          N|         [N, N, N]|
|318725_ARC|     605/85524V|          N|            [N, N]|
|318725_ARC|     605/85524V|          N|               [N]|
|403787_ARC|BPC/77/9601-136|          N|[N, Y, N, Y, N, Y]|
|403787_ARC|BPC/77/9601-136|          Y|   [Y, N, Y, N, Y]|
|403787_ARC|BPC/77/9601-136|          N|      [N, Y, N, Y]|
|403787_ARC|BPC/77/9601-136|          Y|         [Y, N, Y]|
|403787_ARC|BPC/77/9601-136|          N|            [N, Y]|
|403787_ARC|BPC/77/9601-136|          Y|               [Y]|
+----------+---------------+-----------+------------------+
0 голосов
/ 28 августа 2018

Если вы можете ввести столбец «номер строки», вы, возможно, сможете выполнить запрос следующим образом. Псевдокод ниже (не проверял):

df.alias('df1').join(df.alias('df2'), 
  on=
     col('df1.customer')==col('df2.customer') 
     && col('df1.sku')==col('df2.sku') 
     && col('df1.rownum') <=col('df2.rownum') 
 )
.groupBy('df1.customer', 'df1.sku', 'df1.auto_create')
.agg(collect_list('df2.auto_create'))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...