сгруппировать и присоединиться против окна в pyspark - PullRequest
0 голосов
/ 04 февраля 2019

У меня есть фрейм данных в pyspark, в котором есть сотни миллионов строк (вот фиктивный пример):

import datetime
import pyspark.sql.functions as F
from pyspark.sql import Window,Row
from pyspark.sql.functions import col
from pyspark.sql.functions import month, mean,sum,year,avg
from pyspark.sql.functions import concat_ws,to_date,unix_timestamp,datediff,lit
from pyspark.sql.functions import when,min,max,desc,row_number,col

dg = sqlContext.createDataFrame(sc.parallelize([
Row(cycle_dt=datetime.datetime(1984, 5, 2, 0, 0), network_id=4,norm_strength=0.5, spend_active_ind=1,net_spending_amt=0,cust_xref_id=10),
Row(cycle_dt=datetime.datetime(1984, 6, 2, 0, 0), network_id=4,norm_strength=0.5, spend_active_ind=1,net_spending_amt=2,cust_xref_id=11),
Row(cycle_dt=datetime.datetime(1984, 7, 2, 0, 0), network_id=4,norm_strength=0.5, spend_active_ind=1,net_spending_amt=2,cust_xref_id=12),
Row(cycle_dt=datetime.datetime(1984, 4, 2, 0, 0), network_id=4,norm_strength=0.5, spend_active_ind=1,net_spending_amt=2,cust_xref_id=13),
Row(cycle_dt=datetime.datetime(1983,11, 5, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=0,net_spending_amt=8,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1983,12, 2, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=0,net_spending_amt=2,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984, 1, 3, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=1,net_spending_amt=15,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984, 3, 2, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=0,net_spending_amt=7,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984, 4, 3, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=0,net_spending_amt=1,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984, 5, 2, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=0,net_spending_amt=1,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984,10, 6, 0, 0), network_id=1,norm_strength=0.5, spend_active_ind=1,net_spending_amt=10,cust_xref_id=1 ),
Row(cycle_dt=datetime.datetime(1984, 1, 7, 0, 0), network_id=1,norm_strength=0.4, spend_active_ind=0,net_spending_amt=8,cust_xref_id=2 ),
Row(cycle_dt=datetime.datetime(1984, 1, 2, 0, 0), network_id=1,norm_strength=0.4, spend_active_ind=0,net_spending_amt=3,cust_xref_id=2 ),
Row(cycle_dt=datetime.datetime(1984, 2, 7, 0, 0), network_id=1,norm_strength=0.4, spend_active_ind=1,net_spending_amt=5,cust_xref_id=2 ),
Row(cycle_dt=datetime.datetime(1985, 2, 7, 0, 0), network_id=1,norm_strength=0.3, spend_active_ind=1,net_spending_amt=8,cust_xref_id=3 ),
Row(cycle_dt=datetime.datetime(1985, 3, 7, 0, 0), network_id=1,norm_strength=0.3, spend_active_ind=0,net_spending_amt=2,cust_xref_id=3 ),
Row(cycle_dt=datetime.datetime(1985, 4, 7, 0, 0), network_id=1,norm_strength=0.3, spend_active_ind=1,net_spending_amt=1,cust_xref_id=3 ),
Row(cycle_dt=datetime.datetime(1985, 4, 8, 0, 0), network_id=1,norm_strength=0.3, spend_active_ind=1,net_spending_amt=9,cust_xref_id=3 ),
Row(cycle_dt=datetime.datetime(1984, 4, 2, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=0,net_spending_amt=3,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 4, 3, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=0,net_spending_amt=2,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 1, 2, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=0,net_spending_amt=5,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 1, 3, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=1,net_spending_amt=6,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 3, 2, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=0,net_spending_amt=2,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 1, 5, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=0,net_spending_amt=9,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 1, 6, 0, 0), network_id=2,norm_strength=0.5, spend_active_ind=1,net_spending_amt=1,cust_xref_id=4 ),
Row(cycle_dt=datetime.datetime(1984, 1, 7, 0, 0), network_id=2,norm_strength=0.4, spend_active_ind=0,net_spending_amt=7,cust_xref_id=5 ),
Row(cycle_dt=datetime.datetime(1984, 1, 2, 0, 0), network_id=2,norm_strength=0.4, spend_active_ind=0,net_spending_amt=8,cust_xref_id=5 ),
Row(cycle_dt=datetime.datetime(1984, 2, 7, 0, 0), network_id=2,norm_strength=0.4, spend_active_ind=1,net_spending_amt=3,cust_xref_id=5 ),
Row(cycle_dt=datetime.datetime(1985, 2, 7, 0, 0), network_id=2,norm_strength=0.6, spend_active_ind=1,net_spending_amt=6,cust_xref_id=6 ),
Row(cycle_dt=datetime.datetime(1985, 3, 7, 0, 0), network_id=2,norm_strength=0.6, spend_active_ind=0,net_spending_amt=9,cust_xref_id=6 ),
Row(cycle_dt=datetime.datetime(1985, 4, 7, 0, 0), network_id=2,norm_strength=0.6, spend_active_ind=1,net_spending_amt=4,cust_xref_id=6 ),
Row(cycle_dt=datetime.datetime(1985, 4, 8, 0, 0), network_id=2,norm_strength=0.6, spend_active_ind=1,net_spending_amt=6,cust_xref_id=6 ),
Row(cycle_dt=datetime.datetime(1984, 4, 2, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 4, 3, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 1, 2, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 1, 3, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 3, 2, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 1, 5, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 1, 6, 0, 0), network_id=3,norm_strength=0.5, spend_active_ind=0,net_spending_amt=0,cust_xref_id=7 ),
Row(cycle_dt=datetime.datetime(1984, 1, 7, 0, 0), network_id=3,norm_strength=0.4, spend_active_ind=0,net_spending_amt=3,cust_xref_id=8 ),
Row(cycle_dt=datetime.datetime(1984, 1, 2, 0, 0), network_id=3,norm_strength=0.4, spend_active_ind=0,net_spending_amt=2,cust_xref_id=8 ),
Row(cycle_dt=datetime.datetime(1984, 2, 7, 0, 0), network_id=3,norm_strength=0.4, spend_active_ind=1,net_spending_amt=8,cust_xref_id=8 ),
Row(cycle_dt=datetime.datetime(1985, 2, 7, 0, 0), network_id=3,norm_strength=0.6, spend_active_ind=1,net_spending_amt=4,cust_xref_id=9 ),
Row(cycle_dt=datetime.datetime(1985, 3, 7, 0, 0), network_id=3,norm_strength=0.6, spend_active_ind=0,net_spending_amt=1,cust_xref_id=9 ),
Row(cycle_dt=datetime.datetime(1985, 4, 7, 0, 0), network_id=3,norm_strength=0.6, spend_active_ind=1,net_spending_amt=9,cust_xref_id=9 ),
Row(cycle_dt=datetime.datetime(1985, 4, 8, 0, 0), network_id=3,norm_strength=0.6, spend_active_ind=0,net_spending_amt=3,cust_xref_id=9 )
]))

Я пытаюсь суммировать spend_active_ind для каждого cust_xref_id иоставьте те, у которых сумма больше нуля.Один из способов сделать это - использовать grouby и join:

dg1 = dg.groupby("cust_xref_id").agg(sum("spend_active_ind").alias("sum_spend_active_ind"))
dg1 = dg1.filter(dg1.sum_spend_active_ind != 0).select("cust_xref_id")
dg = dg.alias("t1").join(dg1.alias("t2"),col("t1.cust_xref_id")==col("t2.cust_xref_id")).select(col("t1.*"))

Другой способ, которым я могу думать об этом, - использовать window:

w = Window.partitionBy ('cust_xref_id')
dg = dg.withColumn('sum_spend_active_ind',sum(dg.spend_active_ind).over(w))
dg = dg.filter(dg.sum_spend_active_ind!=0)

, какой из этих методов (или любой другойметод) является более эффективным для того, что я пытаюсь сделать.Спасибо

...