Агрегирующий столбец на основе столбца даты и времени другой таблицы PySpark / SQL - PullRequest
2 голосов
/ 17 апреля 2020

В настоящее время я пытаюсь выполнить агрегацию в зависимости от даты для столбца таблицы, используя даты из другой таблицы. Таблица 1 содержит идентификаторы пользователей и даты (а также другую информацию, которая не должна агрегироваться). Таблица 2 содержит значения I w sh для агрегирования с теми же идентификаторами и разными датами.

Цель состоит в агрегировании значений из таблицы 2 только , если они предшествуют дате в строка в таблице 1.

На приведенной ниже иллюстрации желаемой функцией агрегирования является «среднее», однако, если можно обеспечить общее решение PySpark (или SQL), которое позволяет этой функции агрегирования быть либо встроенной в (F.mean, F.sum) или пользовательская пользовательская функция, которая была бы идеальной.

Таблица 1 - Таблица даты (примечание: идентификаторы пользователя может повторяться в обеих таблицах)

+---+---------- +----------          
|USER|   DATE   |USER_STATE|   
+---+---------- +---------- 
|  3 | 7/1/2019 |  Current |
|  1 | 6/9/2019 |  Expired |
|  1 | 1/1/2019 |  Current |
+----+----------+----------- 

Таблица 2 - Таблица агрегации

+---+---------- +----------          
|USER|CHARGEDATE|AMOUNTPAID|   
+---+---------- +---------- 
|  1 | 7/1/2018 |  10.00   |
|  1 | 5/1/2019 |  40.00   |
|  1 | 2/2/2019 |  10.00   |
|  3 | 1/2/2019 |  15.00   |
+----+----------+----------- 

Желаемый результат - агрегация (среднее значение) рассчитывается для каждого пользователя и зависит от CHARGEDATE до DATE в таблице 1

+---+---------- +----------+---------------          
|USER|   DATE   |USER_STATE|MEAN_AMOUNTPAID|   
+---+---------- +----------+--------------- 
|  3 | 7/1/2019 |  Current |    15.00      |
|  1 | 6/9/2019 |  Expired |    20.00      | 
|  1 | 1/1/2019 |  Current |    10.00      |
+----+----------+----------+--------------- 
Row 2 - includes all user 1 Table 2 values because all ChargedDate< date 
Row 3 - includes only includes user 1's row 1 Table 2 value because it's the only chargeddate less than date

Я знаю, что это может быть неэффективно выполнено путем запуска al oop для каждой строки в таблице 1, получая DATE для эту строку, и использовать его для запроса второй таблицы. Я ищу решение без петель, если это возможно. Заранее спасибо!

Ответы [ 2 ]

4 голосов
/ 17 апреля 2020

PySpark способ сделать это включает преобразование ваших DATE и CHARGEDATE столбцов в DateType в состоянии filter на DATE>CHARGEDATE. Поэтому я предположил, что ваша дата была в формате "M/d/yyyy", если это было наоборот, просто замените его на "d/M/yyyy"

#data.show()
#+----+--------+----------+
#|USER|    DATE|USER_STATE|
#+----+--------+----------+
#|   3|7/1/2019|   Current|
#|   1|6/9/2019|   Expired|
#|   1|1/1/2019|   Current|
#+----+--------+----------+

#aggregation.show()
#+----+----------+----------+
#|USER|CHARGEDATE|AMOUNTPAID|
#+----+----------+----------+
#|   1|  7/1/2018|      10.0|
#|   1|  5/1/2019|      40.0|
#|   1|  2/2/2019|      10.0|
#|   3|  1/2/2019|      15.0|
#+----+----------+----------+

from pyspark.sql import functions as F
data.join(aggregation,['USER'])\
      .withColumn("DATE",F.to_date("DATE","M/d/yyyy"))\
      .withColumn("CHARGEDATE", F.to_date("CHARGEDATE", "M/d/yyyy"))\
      .filter("DATE>CHARGEDATE")\
      .groupBy("USER","DATE","USER_STATE").agg(F.mean("AMOUNTPAID").alias("mean_amount_paid"))\
      .show()

+----+----------+----------+----------------+
|USER|      DATE|USER_STATE|mean_amount_paid|
+----+----------+----------+----------------+
|   1|2019-06-09|   Expired|            20.0|
|   1|2019-01-01|   Current|            10.0|
|   3|2019-07-01|   Current|            15.0|
+----+----------+----------+----------------+
1 голос
/ 17 апреля 2020

Попробуйте следующее, а также проверьте здесь sqlfiddle

select
  d.users,
  date,
  user_state,
  avg(amount) as mean_amount_paid
from data d
join aggregation a
on d.users = a.users
where d.date > a.ch_date
group by
  d.users,
  date,
  user_state

Вывод:

|users | date | user_state | mean
+-------------------------------+
  1  2019-01-01  Current      10
  1  2019-06-09  Expired      20
  3  2019-07-01  Current      15
+-------------------------------+
...