Spark SQL - сохранить один результат после объединения - PullRequest
2 голосов
/ 17 апреля 2020

У меня есть два фрейма данных, которые я пытаюсь объединить, и я понял, что с моей первоначальной реализацией я получаю нежелательные результаты:

// plain_txns_df.show(false)
+------------+---------------+---------------+-----------+
|txn_date_at |account_number |merchant_name  |txn_amount |
+------------+---------------+---------------+-----------+
|2020-04-08  |1234567        |Starbucks      |2.02       |
|2020-04-14  |1234567        |Starbucks      |2.86       |
|2020-04-14  |1234567        |Subway         |12.02      |
|2020-04-14  |1234567        |Amazon         |3.21       |
+------------+---------------+---------------+-----------+
// richer_txns_df.show(false)
+----------+-------+----------------------+-------------+
|TXN_DT    |ACCT_NO|merch_name            |merchant_city|
+----------+-------+----------------------+-------------+
|2020-04-08|1234567|Subway                |Toronto      |
|2020-04-14|1234567|Subway                |Toronto      |
+----------+-------+----------------------+-------------+

Из двух вышеупомянутых фреймов моя цель состоит в том, чтобы обогатить простые транзакции городом торговца для транзакций, которые находятся в пределах 7-дневного окна (т. е. дата транзакции из более богатого фрейма данных транзакции должна быть между простой датой и простой датой - 7 дней.

Сначала я думал это было довольно просто и соединило данные следующим образом (диапазонное соединение, которое я знаю):

spark.sql(
    """
      | SELECT
      | plain.txn_date_at,
      | plain.account_number,
      | plain.merchant_name,
      | plain.txn_amount,
      | richer.merchant_city
      | FROM plain_txns_df plain
      | LEFT JOIN richer_txns_df richer
      | ON plain.account_number = richer.ACCT_NO
      | AND plain.merchant_name = richer.merch_name
      | AND richer.txn_date BETWEEN date_sub(plain.txn_date_at, 7) AND plain.txn_date_at
    """.stripMargin)

Однако, используя вышеизложенное, я получаю дублированные результаты для транзакции 14 апреля, потому что данные и учетная запись продавца детали соответствуют более богатой записи из восьмого и вписываются в диапазон дат:

+------------+---------------+---------------+-----------+-------------+
|txn_date_at |account_number |merchant_name  |txn_amount |merchant_city|
+------------+---------------+---------------+-----------+-------------+
|2020-04-08  |1234567        |Starbucks      |2.02       |Toronto      |
|2020-04-14  |1234567        |Starbucks      |2.86       |Toronto      | // Apr-08 Richer record
|2020-04-14  |1234567        |Starbucks      |2.86       |Toronto      |
+------------+---------------+---------------+-----------+-------------+

Есть ли способ получить только одну запись для каждого значения в моем простом DataFrame (т.е. получить одна запись для 14-го числа в приведенном выше наборе результатов)? Я попытался запустить отдельный после объединения, в котором Я решаю эту проблему, но я понимаю, что если в один и тот же день будет совершено две транзакции для одного и того же продавца, я их потеряю.

Я думал о перемещении более богатой таблицы в подзапрос и последующем применении фильтра даты внутри него, но я не знаю, как передать значение фильтра даты транзакции в этот запрос :(. Что-то вроде следующего , но не распознает простую дату транзакции:

spark.sql(
    """
      | SELECT
      | plain.txn_date_at,
      | plain.account_number,
      | plain.merchant_name,
      | plain.txn_amount,
      | richer2.merchant_city
      | FROM plain_txns_df plain
      | LEFT JOIN ( 
      |    SELECT ACCT_NO, merch_name from richer_txns_df
      |    WHERE txn_date BETWEEN date_sub(plain.txn_date_at, 7) AND plain.txn_date_at
      | ) richer2
      | ON plain.account_number = richer2.ACCT_NO
      | AND plain.merchant_name = richer2.merch_name
    """.stripMargin)

1 Ответ

1 голос
/ 17 апреля 2020

Первое, что мне нужно сделать, - это создать уникальный ключ на plain_txns_df, который позволяет отличать guish строк друг от друга при попытке их агрегировать / сравнивать.

import org.apache.spark.sql.functions._
plainDf.withColumn("id", monotonically_increasing_id())

С этим вы можете продолжить и выполнить первый отправленный вами запрос (плюс столбец id), который возвращает дубликаты:

spark.sql("""
    SELECT
    plain.id,
    plain.txn_date_at,
    plain.account_number,
    plain.merchant_name,
    plain.txn_amount,
    richer.merchant_city,
    richer.txn_dt
    FROM plain_txns_df plain
    INNER JOIN richer_txns_df richer
    ON plain.account_number = richer.acc_no
    AND plain.merchant_name = richer.merch_name
    AND richer.txn_dt BETWEEN date_sub(plain.txn_date_at, 7) AND plain.txn_date_at
  """.stripMargin).createOrReplaceTempView("foo")

Далее выполняется дедупликация над данным фреймом при получении записи. с последней richer_txns_df.txn_dt датой для данного id.

spark.sql("""
    SELECT
    f1.txn_date_at,
    f1.account_number,
    f1.merchant_name,
    f1.txn_amount,
    f1.merchant_city
    FROM foo f1
    LEFT JOIN foo f2
    ON f2.id = f1.id
    AND f2.txn_dt > f1.txn_dt
    WHERE f2.id IS NULL
  """.stripMargin).show
...