PySpark DataFrame: найдите ближайшее значение и нарежьте DataFrame - PullRequest
1 голос
/ 27 марта 2019

Итак, я провел достаточно исследований и не нашел пост, в котором говорится о том, что я хочу сделать.

У меня есть PySpark DataFrame my_df, который sorted от value column-

+----+-----+                                                                    
|name|value|
+----+-----+
|   A|   30|
|   B|   25|
|   C|   20|
|   D|   18|
|   E|   18|
|   F|   15|
|   G|   10|
+----+-----+

Сумма всех значений в столбце value равна 136. Я хочу получить все строки, чьи combined values >= x% of 136. В этом примере, скажем, x=80. Тогда target sum = 0.8*136 = 108.8. Следовательно, новый DataFrame будет состоять из всех строк, которые имеют combined value >= 108.8.

В нашем примере это сводится к строке D (поскольку объединенные значения до D = 30+25+20+18 = 93).

Тем не менее, сложная часть заключается в том, что я также хочу включить сразу следующие строки с дублирующимися значениями. В этом случае я также хочу включить строку E, поскольку она имеет то же значение, что и строка D, т.е. 18.

Я хочу нарезать my_df, указав процентную переменную x, например, 80, как обсуждалось выше. Новый DataFrame должен состоять из следующих строк -

+----+-----+                                                                    
|name|value|
+----+-----+
|   A|   30|
|   B|   25|
|   C|   20|
|   D|   18|
|   E|   18|
+----+-----+

Одна вещь, которую я мог бы здесь сделать, - это перебирать DataFrame (which is ~360k rows), но я предполагаю, что это побеждает цель Spark.

Есть ли краткая функция для того, что я хочу здесь?

Ответы [ 2 ]

3 голосов
/ 27 марта 2019

Используйте SQL-функции pyspark, чтобы сделать это кратко.

result = my_df.filter(my_df.value > target).select(my_df.name,my_df.value)
result.show()

Редактировать: на основе вопроса редактирования ОП - Вычислить текущую сумму и получать строки, пока не будет достигнуто целевое значение. Обратите внимание, что это приведет к строкам до D, а не к E, что кажется странным требованием.

from pyspark.sql import Window
from pyspark.sql import functions as f

# Total sum of all `values`
target = (my_df.agg(sum("value")).collect())[0][0]

w = Window.orderBy(my_df.name) #Ideally this should be a column that specifies ordering among rows
running_sum_df = my_df.withColumn('rsum',f.sum(my_df.value).over(w))
running_sum_df.filter(running_sum_df.rsum <= 0.8*target)
2 голосов
/ 27 марта 2019

Ваши требования довольно строгие, поэтому сложно сформулировать эффективное решение вашей проблемы.Тем не менее, вот один из подходов:

Сначала вычислите совокупную сумму и общую сумму для столбца value и отфильтруйте DataFrame, используя указанное вами процентное значение целевого условия.Давайте назовем этот результат df_filtered:

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.orderBy(f.col("value").desc(), "name").rangeBetween(Window.unboundedPreceding, 0)
target = 0.8

df_filtered = df.withColumn("cum_sum", f.sum("value").over(w))\
    .withColumn("total_sum", f.sum("value").over(Window.partitionBy()))\
    .where(f.col("cum_sum") <= f.col("total_sum")*target)

df_filtered.show()
#+----+-----+-------+---------+
#|name|value|cum_sum|total_sum|
#+----+-----+-------+---------+
#|   A|   30|     30|      136|
#|   B|   25|     55|      136|
#|   C|   20|     75|      136|
#|   D|   18|     93|      136|
#+----+-----+-------+---------+

Затем присоедините этот отфильтрованный DataFrame к оригиналу в столбце value.Поскольку ваш DataFrame уже отсортирован по value, окончательный вывод будет содержать нужные вам строки.

df.alias("r")\
    .join(
    df_filtered.alias('l'),
    on="value"
).select("r.name", "r.value").sort(f.col("value").desc(), "name").show()
#+----+-----+
#|name|value|
#+----+-----+
#|   A|   30|
#|   B|   25|
#|   C|   20|
#|   D|   18|
#|   E|   18|
#+----+-----+

Столбцы total_sum и cum_sum вычисляются с использованием Window function .

Окно w размещается по убыванию столбца value, за которым следует столбец name.Столбец name используется для разрыва связей - без него обе строки C и D будут иметь одинаковую суммарную сумму 111 = 75+18+18, и вы неправильно потеряете их обоих в фильтре.

w = Window\                                     # Define Window
    .orderBy(                                   # This will define ordering
        f.col("value").desc(),                  # First sort by value descending
        "name"                                  # Sort on name second
    )\
    .rangeBetween(Window.unboundedPreceding, 0) # Extend back to beginning of window

rangeBetween(Window.unboundedPreceding, 0) указывает, что Окно должно включать все строки перед текущей строкой (определенной orderBy).Это то, что делает его накопительной суммой.

...