Группировать и фильтровать фрейм данных Pyspark - PullRequest
1 голос
/ 04 октября 2019

У меня есть фрейм данных PySpark с 3 столбцами. Некоторые строки похожи в 2 столбцах, но не в третьем, см. Пример ниже.

----------------------------------------
first_name | last_name | requests_ID    |
----------------------------------------
Joe        | Smith     |[2,3]           |
---------------------------------------- 
Joe        | Smith     |[2,3,5,6]       |
---------------------------------------- 
Jim        | Bush      |[9,7]           |
---------------------------------------- 
Jim        | Bush      |[21]            |
---------------------------------------- 
Sarah      | Wood      |[2,3]           |
----------------------------------------   

Я хочу сгруппировать строки по столбцам {first_name, last_name} и иметь только строку с максимальнымколичество {запросов_ID}. Таким образом, результаты должны быть:

----------------------------------------
first_name | last_name | requests_ID    |
----------------------------------------
Joe        | Smith     |[2,3,5,6]       |
---------------------------------------- 
Jim        | Bush      |[9,7]           |
---------------------------------------- 
Sarah      | Wood      |[2,3]           |
---------------------------------------- 

Я пробую разные вещи, например, следующие, но он дает мне вложенный массив обеих строк в группе и не самый длинный.

gr_df = filtered_df.groupBy("first_name", "last_name").agg(F.collect_set("requests_ID").alias("requests_ID")) 

Вот результаты, которые я получаю:

----------------------------------------
first_name | last_name | requests_ID    |
----------------------------------------
Joe        | Smith     |[[9,7],[2,3,5,6]]|
---------------------------------------- 
Jim        | Bush      |[[9,7],[21]]    |
---------------------------------------- 
Sarah      | Wood      |[2,3]           |
---------------------------------------- 

Ответы [ 2 ]

1 голос
/ 04 октября 2019

Вы можете использовать size, чтобы определить длину столбца массива, и использовать window, как показано ниже:

Импорт и создание образца DataFrame

import pyspark.sql.functions as f
from pyspark.sql.window import Window

df = spark.createDataFrame([('Joe','Smith',[2,3]),
('Joe','Smith',[2,3,5,6]),
('Jim','Bush',[9,7]),
('Jim','Bush',[21]),
('Sarah','Wood',[2,3])], ('first_name','last_name','requests_ID'))

Определение окна для получения строкиномер столбца requests_ID в зависимости от длины столбца в порядке убывания.

Здесь f.size("requests_ID") даст длину столбца requests_ID, а desc() отсортирует ее в порядке убывания.

w_spec = Window().partitionBy("first_name", "last_name").orderBy(f.size("requests_ID").desc())

Примените оконную функцию и получите первую строку.

df.withColumn("rn", f.row_number().over(w_spec)).where("rn ==1").drop("rn").show()
+----------+---------+------------+
|first_name|last_name| requests_ID|
+----------+---------+------------+
|       Jim|     Bush|      [9, 7]|
|     Sarah|     Wood|      [2, 3]|
|       Joe|    Smith|[2, 3, 5, 6]|
+----------+---------+------------+
1 голос
/ 04 октября 2019

Чтобы выполнить текущее значение df, которое выглядит следующим образом,

----------------------------------------
first_name | last_name | requests_ID    |
----------------------------------------
Joe        | Smith     |[[9,7],[2,3,5,6]]|
---------------------------------------- 
Jim        | Bush      |[[9,7],[21]]    |
---------------------------------------- 
Sarah      | Wood      |[2,3]           |
---------------------------------------- 

попробуйте это,

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, ArrayType

def myfunc(x):
  temp = []
  for _ in x:
    temp.append(len(x))

  max_ind = temp.index(max(temp))

  return x[max_ind]

udf_extract = F.udf(myfunc, ArrayType(IntegerType()))

df = df.withColumn('new_requests_ID', udf_extract('requests_ID'))

#df.show()

или, альтернативно, без объявления переменной,

import pyspark.sql.functions as F

@F.udf
def myfunc(x):
  temp = []
  for _ in x:
    temp.append(len(x))

  max_ind = temp.index(max(temp))

  return x[max_ind]

df = df.withColumn('new_requests_ID', myfunc('requests_ID'))

#df.show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...