Вот подробный подход, которому вы можете следовать -
from pyspark.sql.types import *
################
##Define Schema
################
schema1 = StructType([StructField('id', IntegerType(), True),
StructField('date', StringType(), True),
StructField('number', IntegerType(), True)
]
)
schema2 = StructType([StructField('id', IntegerType(), True),
StructField('fromdate', StringType(), True),
StructField('todate', StringType(), True)
]
)
################
##Prepare Data
################
data1 = [
(101,'2018-12-01',250 ),
(101,'2018-12-02',150 ),
(102,'2018-11-25',1000),
(102,'2018-10-26',2000),
(102,'2018-09-25',5000),
(103,'2018-10-26',200 ),
(103,'2018-10-27',2000)
]
data2 = [
(101,'2018-10-01','2018-11-01'),
(101,'2018-11-02','2018-12-30'),
(102,'2018-09-01','2018-09-30'),
(102,'2018-10-01','2018-12-31'),
(103,'2018-10-01','2018-10-30'),
(104,'2018-10-01','2018-10-30')
]
################
##Create dataframe and type cast to date
################
df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)
df1 = df1.select(df1.id,df1.date.cast("date"),df1.number)
df2 = df2.select(df2.id,df2.fromdate.cast("date"),df2.todate.cast("date"))
Определить условие соединения и объединить кадры данных
################
##Define Joining Condition
################
cond = [df1.id == df2.id, df1.date.between(df2.fromdate,df2.todate)]
################
##Join dataframes using joining condition "cond" and aggregation
################
from pyspark.sql.functions import coalesce
df2.\
join(df1, cond,'left').\
select(df2.id,df1.number,df2.fromdate,df2.todate).\
groupBy('id','fromdate','todate').\
sum('number').fillna(0).\
show()