PySpark: объединение фреймов данных, в которых одно значение (из 1-го фрейма данных) находится между двумя другими (из 2-го фрейма данных) - PullRequest
1 голос
/ 04 апреля 2019

Мне нужно объединить два кадра данных по идентификатору и условию, когда дата в одном кадре данных находится между двумя датами в другом кадре данных и сгруппировать (вычислить сумму) другого столбца

Дата-кадр A имеет дату («дата»), номер («число») и идентификатор («id»):

| id    | date          | number | 

| 101   |  2018-12-01   | 250  |
| 101   |  2018-12-02   | 150  | 
| 102   |  2018-11-25   | 1000 | 
| 102   |  2018-10-26   | 2000 | 
| 102   |  2018-09-25   | 5000 | 
| 103   |  2018-10-26   | 200  | 
| 103   |  2018-10-27   | 2000 | 

Фрейм данных B имеет Id ("id"), fromdate ("fromdate") и todate ("todate"):

| id    |    fromdate   | todate     | 

| 101   |  2018-10-01   | 2018-11-01 |
| 101   |  2018-11-02   | 2018-12-30 | 
| 102   |  2018-09-01   | 2018-09-30 | 
| 102   |  2018-10-01   | 2018-12-31 | 
| 103   |  2018-10-01   | 2018-10-30 | 
| 104   |  2018-10-01   | 2018-10-30 | 

Теперь мне нужно объединить эти два кадра данных по идентификатору и дате, а затем суммировать все числа соответственно. Например: Рассмотрим четвертую строку в кадре данных B для идентификатора 102, и между этими датами у нас есть две соответствующие строки (строка № 3,4) из кадра данных Am. Объедините их, вычислив сумму.

Таким образом, результирующая строка будет

| id    |    fromdate   | todate     | sum  |

| 102   |  2018-10-01   | 2018-12-31 | 3000 |

Конечный результат должен быть: | id | от даты | в настоящее время | сумма |

| 101   |  2018-10-01   | 2018-11-01 | 0      |
| 101   |  2018-11-02   | 2018-12-30 | 400    |
| 102   |  2018-09-01   | 2018-09-30 | 5000   |
| 102   |  2018-10-01   | 2018-12-31 | 3000   |
| 103   |  2018-10-01   | 2018-10-30 | 2200   |
| 104   |  2018-10-01   | 2018-10-30 | 0      |

1 Ответ

2 голосов
/ 04 апреля 2019

Вот подробный подход, которому вы можете следовать -

from pyspark.sql.types import * 

################
##Define Schema
################
schema1 = StructType([StructField('id', IntegerType(), True),
                     StructField('date', StringType(), True),
                     StructField('number', IntegerType(), True)
                     ]
                    )


schema2 = StructType([StructField('id', IntegerType(), True),
                     StructField('fromdate', StringType(), True),
                     StructField('todate', StringType(), True)
                     ]
                    )
################
##Prepare Data
################

data1  = [
(101,'2018-12-01',250 ),
(101,'2018-12-02',150 ), 
(102,'2018-11-25',1000), 
(102,'2018-10-26',2000), 
(102,'2018-09-25',5000), 
(103,'2018-10-26',200 ), 
(103,'2018-10-27',2000)
]

data2 = [
(101,'2018-10-01','2018-11-01'),
(101,'2018-11-02','2018-12-30'), 
(102,'2018-09-01','2018-09-30'), 
(102,'2018-10-01','2018-12-31'), 
(103,'2018-10-01','2018-10-30'), 
(104,'2018-10-01','2018-10-30')
]

################
##Create dataframe and type cast to date
################

df1 = spark.createDataFrame(data1, schema1)

df2 = spark.createDataFrame(data2, schema2)

df1 = df1.select(df1.id,df1.date.cast("date"),df1.number)

df2 = df2.select(df2.id,df2.fromdate.cast("date"),df2.todate.cast("date"))

Определить условие соединения и объединить кадры данных

################
##Define Joining Condition
################

cond = [df1.id == df2.id, df1.date.between(df2.fromdate,df2.todate)]

################
##Join dataframes using joining condition "cond" and aggregation
################

from pyspark.sql.functions  import coalesce

df2.\
    join(df1, cond,'left').\
    select(df2.id,df1.number,df2.fromdate,df2.todate).\
    groupBy('id','fromdate','todate').\
    sum('number').fillna(0).\
    show()
...