Как выбрать столбцы с помощью Dynami c запрос выбора с помощью оконной функции - PullRequest
0 голосов
/ 05 августа 2020

У меня есть образец входного кадра данных, как показано ниже, но столбцы значений (clm, начинающиеся с m) могут быть n числом.

customer_id|month_id|m1  |m2 |m3 .......m_n
1001       |  01    |10  |20    
1002       |  01    |20  |30    
1003       |  01    |30  |40
1001       |  02    |40  |50    
1002       |  02    |50  |60    
1003       |  02    |60  |70
1001       |  03    |70  |80    
1002       |  03    |80  |90    
1003       |  03    |90  |100

Теперь мне нужно создать новые столбцы на основе совокупной суммы путем группировки каждый месяц. Следовательно, я использовал оконную функцию. Поскольку у меня будет n столбцов вместо withColumn для l oop, мне нужно динамически создать запрос или список и передать его в selectExpr для вычисления новых столбцов.

Например:

rownum_window = (Window.partitionBy("partner_id").orderBy("month_id").rangeBetween(Window.unboundedPreceding, 0))
df = df.select("*", F.sum(col("m1")).over(rownum_window).alias("n1"))

Но я хочу подготовить выражение Dynami c, а затем мне нужно перейти к выбору фрейма данных. Как я могу это сделать?

LIKE: expr = ["F.sum(col("m1")).over(rownum_window).alias("n1")", "F.sum(col("m2")).over(rownum_window).alias("n2")", "F.sum(col("m3")).over(rownum_window).alias("n3")", .......]
df = df.select("*', expr)

Или любым другим способом выбора фрейма данных, я могу создать выражение выбора?

Вывод:

customer_id|month_id|m1     |m2    |n1   |n2  
1001       |  01    |10     |20    |10   |20  
1002       |  01    |20     |30    |20   |30  
1003       |  01    |30     |40    |30   |40  
1001       |  02    |40     |50    |50   |70  
1002       |  02    |50     |60    |70   |90
1003       |  02    |60     |70    |90   |110  
1001       |  03    |70     |80    |120  |150
1002       |  03    |80     |90    |150  |180
1003       |  03    |90     |100   |180  |210

Ответы [ 2 ]

1 голос
/ 06 августа 2020

с небольшой модификацией предложения @Lamanus, приведенный ниже код может быть полезен для решения вашей проблемы,

# pyspark --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 1
from pyspark.sql import Row
from pyspark.sql.functions import *
from pyspark.sql.window import Window

drow = Row("customer_id","month_id","m1","m2","m3","m4")
data=[drow("1001","01","10","20","10","20"),drow("1002","01","20","30","20","30"),drow("1003","01","30","40","30","40"),drow("1001","02","40","50","40","50"),drow("1002","02","50","60","50","60"),drow("1003","02","60","70","60","70"),drow("1001","03","70","80","70","80"),drow("1002","03","80","90","80","90"),drow("1003","03","90","100","90","100")]

df = spark.createDataFrame(data)
df.show()
'''
+-----------+--------+---+---+---+---+
|customer_id|month_id| m1| m2| m3| m4|
+-----------+--------+---+---+---+---+
|       1001|      01| 10| 20| 10| 20|
|       1002|      01| 20| 30| 20| 30|
|       1003|      01| 30| 40| 30| 40|
|       1001|      02| 40| 50| 40| 50|
|       1002|      02| 50| 60| 50| 60|
|       1003|      02| 60| 70| 60| 70|
|       1001|      03| 70| 80| 70| 80|
|       1002|      03| 80| 90| 80| 90|
|       1003|      03| 90|100| 90|100|
+-----------+--------+---+---+---+---+
'''


a = ["m1","m2"]
b = ["m3","m4"]
rownum_window = (Window.partitionBy("customer_id").orderBy("month_id").rangeBetween(Window.unboundedPreceding, 0))
expr = ["*",sum(col("m1")).over(rownum_window).alias("sum1"), sum(col("m2")).over(rownum_window).alias("sum2"),avg(col("m3")).over(rownum_window).alias("avg1"), avg(col("m4")).over(rownum_window).alias("avg2") ]
df.select(expr).show()

'''
+-----------+--------+---+---+---+---+-----+-----+----+----+
|customer_id|month_id| m1| m2| m3| m4| sum1| sum2|avg1|avg2|
+-----------+--------+---+---+---+---+-----+-----+----+----+
|       1003|      01| 30| 40| 30| 40| 30.0| 40.0|30.0|40.0|
|       1003|      02| 60| 70| 60| 70| 90.0|110.0|45.0|55.0|
|       1003|      03| 90|100| 90|100|180.0|210.0|60.0|70.0|
|       1002|      01| 20| 30| 20| 30| 20.0| 30.0|20.0|30.0|
|       1002|      02| 50| 60| 50| 60| 70.0| 90.0|35.0|45.0|
|       1002|      03| 80| 90| 80| 90|150.0|180.0|50.0|60.0|
|       1001|      01| 10| 20| 10| 20| 10.0| 20.0|10.0|20.0|
|       1001|      02| 40| 50| 40| 50| 50.0| 70.0|25.0|35.0|
|       1001|      03| 70| 80| 70| 80|120.0|150.0|40.0|50.0|
+-----------+--------+---+---+---+---+-----+-----+----+----+
'''
1 голос
/ 06 августа 2020

ОБНОВЛЕНО:

import pyspark.sql.functions as F
from pyspark.sql import Window

rownum_window = Window.partitionBy("customer_id").orderBy("month_id").rangeBetween(Window.unboundedPreceding, 0)

expr = [F.sum(F.col("m1")).over(rownum_window).alias("n1"), F.sum(F.col("m2")).over(rownum_window).alias("n2")]
df.select('*', *expr) \
  .orderBy('month_id', 'customer_id') \
  .show(10, False)

+-----------+--------+---+---+---+---+
|customer_id|month_id|m1 |m2 |n1 |n2 |
+-----------+--------+---+---+---+---+
|1001       |1       |10 |20 |10 |20 |
|1002       |1       |20 |30 |20 |30 |
|1003       |1       |30 |40 |30 |40 |
|1001       |2       |40 |50 |50 |70 |
|1002       |2       |50 |60 |70 |90 |
|1003       |2       |60 |70 |90 |110|
|1001       |3       |70 |80 |120|150|
|1002       |3       |80 |90 |150|180|
|1003       |3       |90 |100|180|210|
+-----------+--------+---+---+---+---+

Попробуйте это.

expr = [F.sum(col("m1")).over(rownum_window).alias("n1"), F.sum(col("m2")).over(rownum_window).alias("n2"), ...]
df = df.select('*', *expr)
...