Spark Java DataFrame сумма и удалить дубликаты на основе столбцов - PullRequest
0 голосов
/ 06 июня 2018

У меня есть фрейм данных искры, как показано ниже:

INPUT

+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+------+---------+------+--------+----------+----------+
| accountId|accountNumber|acctNumberTypeCode|cisDivision|currencyCode|priceItemCd|priceItemParam|priceItemParamCode|processingDate|txnAmt|  txnDttm|txnVol|udfChar1|  udfChar2|  udfChar3|
+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+------+---------+------+--------+----------+----------+
|2032000000|   2032000000|          C1_F_ANO|         CA|         USD| PRICEITEM2|            UK|           Country|    2018-06-06|   100|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|
|2032000000|   2032000000|          C1_F_ANO|         CA|         USD| PRICEITEM2|            UK|           Country|    2018-06-06|   100|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|
|1322000000|   1322000000|          C1_F_ANO|         CA|         USD| PRICEITEM1|            US|           Country|    2018-06-06|   100|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|
|1322000000|   1322000000|          C1_F_ANO|         CA|         USD| PRICEITEM1|            US|           Country|    2018-06-06|   100|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|

Теперь я хочу выполнить,

  1. СуммаСтолбец «txnAmt» для записей, имеющих одинаковые accountId и номера счетов.
  2. Удалить дубликаты записей.

Вывод

+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+------+---------+------+--------+----------+----------+
| accountId|accountNumber|acctNumberTypeCode|cisDivision|currencyCode|priceItemCd|priceItemParam|priceItemParamCode|processingDate|txnAmt|  txnDttm|txnVol|udfChar1|  udfChar2|  udfChar3|
+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+------+---------+------+--------+----------+----------+
|2032000000|   2032000000|          C1_F_ANO|         CA|         USD| PRICEITEM2|            UK|           Country|    2018-06-06|   200|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|
|1322000000|   1322000000|          C1_F_ANO|         CA|         USD| PRICEITEM1|            US|           Country|    2018-06-06|   200|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|

IЯ не уверен, как выполнить шаг 1?

Я написал код для выполнения шага 2, отбросьте дубликаты на основе accountId и номеров счетов:

String[] colNames = {"accountId", "accountNumber"};
Dataset<RuleOutputParams> finalDs = rulesParamDS.dropDuplicates(colNames);

Кто-нибудь может помочь?

1 Ответ

0 голосов
/ 06 июня 2018

Загрузить данные и создать для них таблицу SQL

val df = spark.read.format("csv").option("header", true).load("data.csv")
df.createOrReplaceTempView("t")

Затем то, что вам нужно, называется Функции агрегации окон , плюс трюк с row_number() для удаления дубликатов

val df2 = spark.sql("""SELECT * FROM (
  SELECT *, 
    sum(txnAmt) OVER (PARTITION BY accountId, accountNumber) s, 
    row_number() OVER (PARTITION BY accountId, accountNumber ORDER BY processingDate) r FROM t) 
  WHERE r=1""")
  .drop("txnAmt", "r")
  .withColumnRenamed("s", "txnAmt")

И если вы покажете это, вы увидите

+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+---------+------+--------+----------+----------+------+
| accountId|accountNumber|acctNumberTypeCode|cisDivision|currencyCode|priceItemCd|priceItemParam|priceItemParamCode|processingDate|  txnDttm|txnVol|udfChar1|  udfChar2|  udfChar3|txnAmt|
+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+---------+------+--------+----------+----------+------+
|2032000000|   2032000000|          C1_F_ANO|         CA|         USD| PRICEITEM2|            UK|           Country|    2018-06-06|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2| 200.0|
|1322000000|   1322000000|          C1_F_ANO|         CA|         USD| PRICEITEM1|            US|           Country|    2018-06-06|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2| 200.0|
+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+---------+------+--------+----------+----------+------+

В качестве примечания, можно попробовать добавить дополнительные столбцы к следующему, но вам потребуетсядобавить их в группу по пункту

spark.sql("SELECT accountId, accountNumber, SUM(txnAmt) txnAmt FROM t GROUP BY accountId, accountNumber").show
+----------+-------------+------+
| accountId|accountNumber|txnAmt|
+----------+-------------+------+
|2032000000|   2032000000| 200.0|
|1322000000|   1322000000| 200.0|
+----------+-------------+------+
...