У меня есть два фрейма данных, которые я пытаюсь объединить, и я понял, что с моей первоначальной реализацией я получаю нежелательные результаты:
// plain_txns_df.show(false)
+------------+---------------+---------------+-----------+
|txn_date_at |account_number |merchant_name |txn_amount |
+------------+---------------+---------------+-----------+
|2020-04-08 |1234567 |Starbucks |2.02 |
|2020-04-14 |1234567 |Starbucks |2.86 |
|2020-04-14 |1234567 |Subway |12.02 |
|2020-04-14 |1234567 |Amazon |3.21 |
+------------+---------------+---------------+-----------+
// richer_txns_df.show(false)
+----------+-------+----------------------+-------------+
|TXN_DT |ACCT_NO|merch_name |merchant_city|
+----------+-------+----------------------+-------------+
|2020-04-08|1234567|Subway |Toronto |
|2020-04-14|1234567|Subway |Toronto |
+----------+-------+----------------------+-------------+
Из двух вышеупомянутых фреймов моя цель состоит в том, чтобы обогатить простые транзакции городом торговца для транзакций, которые находятся в пределах 7-дневного окна (т. е. дата транзакции из более богатого фрейма данных транзакции должна быть между простой датой и простой датой - 7 дней.
Сначала я думал это было довольно просто и соединило данные следующим образом (диапазонное соединение, которое я знаю):
spark.sql(
"""
| SELECT
| plain.txn_date_at,
| plain.account_number,
| plain.merchant_name,
| plain.txn_amount,
| richer.merchant_city
| FROM plain_txns_df plain
| LEFT JOIN richer_txns_df richer
| ON plain.account_number = richer.ACCT_NO
| AND plain.merchant_name = richer.merch_name
| AND richer.txn_date BETWEEN date_sub(plain.txn_date_at, 7) AND plain.txn_date_at
""".stripMargin)
Однако, используя вышеизложенное, я получаю дублированные результаты для транзакции 14 апреля, потому что данные и учетная запись продавца детали соответствуют более богатой записи из восьмого и вписываются в диапазон дат:
+------------+---------------+---------------+-----------+-------------+
|txn_date_at |account_number |merchant_name |txn_amount |merchant_city|
+------------+---------------+---------------+-----------+-------------+
|2020-04-08 |1234567 |Starbucks |2.02 |Toronto |
|2020-04-14 |1234567 |Starbucks |2.86 |Toronto | // Apr-08 Richer record
|2020-04-14 |1234567 |Starbucks |2.86 |Toronto |
+------------+---------------+---------------+-----------+-------------+
Есть ли способ получить только одну запись для каждого значения в моем простом DataFrame (т.е. получить одна запись для 14-го числа в приведенном выше наборе результатов)? Я попытался запустить отдельный после объединения, в котором Я решаю эту проблему, но я понимаю, что если в один и тот же день будет совершено две транзакции для одного и того же продавца, я их потеряю.
Я думал о перемещении более богатой таблицы в подзапрос и последующем применении фильтра даты внутри него, но я не знаю, как передать значение фильтра даты транзакции в этот запрос :(. Что-то вроде следующего , но не распознает простую дату транзакции:
spark.sql(
"""
| SELECT
| plain.txn_date_at,
| plain.account_number,
| plain.merchant_name,
| plain.txn_amount,
| richer2.merchant_city
| FROM plain_txns_df plain
| LEFT JOIN (
| SELECT ACCT_NO, merch_name from richer_txns_df
| WHERE txn_date BETWEEN date_sub(plain.txn_date_at, 7) AND plain.txn_date_at
| ) richer2
| ON plain.account_number = richer2.ACCT_NO
| AND plain.merchant_name = richer2.merch_name
""".stripMargin)