DropDuplicates не дает ожидаемого результата - PullRequest
0 голосов
/ 22 марта 2019

Я работаю над вариантом использования удаления дубликатов записей из входящих структурированных данных (в виде файлов CSV в папке на HDFS).Чтобы попробовать этот вариант использования, я написал пример кода с использованием опции файлов, чтобы посмотреть, можно ли удалить дубликаты из записей, которые присутствуют в CSV, которые копируются в папку (HDFS).

Найдите под кодовым фрагментом:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}


val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
val userSchema = new StructType()
    .add("prod_code", "string")
    .add("bal", "integer")
    .add("v_txn_id", "string")
    .add("timestamp", "Timestamp")

val csvDF = spark.readStream.option("sep", ",")
                            .schema(userSchema)
                            .csv("/user/Temp")
csvDF.dropDuplicates("v_txn_id")
csvDF.createOrReplaceTempView("table1")

val dbDf2 = spark.sql("select prod_code, bal, v_txn_id, current_timestamp timestamp from table1")
dbDf2.writeStream.queryName("aggregates").outputMode("update").format("memory").start()

spark.sql("select * from aggregates").show();

Теперь, когда я копирую файл в папку с дублирующимися записями (по v_txn_id), я все еще вижу, что получатель получает все строки изфайл:

P1,1000,TXNID1
P1,2000,TXNID2
P1,3000,TXNID2
P1,4000,TXNID3
P1,5000,TXNID3
P1,6000,TXNID4

Все эти строки в CSV-файле перемещаются в результат "агрегаты".То, что я ожидаю, это:

P1,1000,TXNID1
P1,3000,TXNID2
P1,5000,TXNID3
P1,6000,TXNID4

Это первый раз, когда я пытаюсь структурированной потоковой передачи (с состоянием), так что простите меня за тривиальный вопрос.Любые предложения очень помогут.

1 Ответ

0 голосов
/ 23 марта 2019

По вашему ожидаемому выводу, я считаю, что вам нужно найти максимум балла на основе столбцов prod_code и v_txn_id. Чтобы добиться результата, в вашей окончательной таблице aggregate вы можете использовать функцию окна (partition by), чтобы найти максимум bal на основе столбцов prod_code и v_txn_id, создав временный столбец с именем * 1008. *. Затем во внешнем запросе выберите отдельные значения на основе столбцов prod_code, temp_bal и v_txn_id.

spark.sql("select distinct prod_code,temp_bal as bal,v_txn_id from(select *,max(bal) over(partition by prod_code,v_txn_id) as temp_bal from aggregates) order by prod_code,v_txn_id").show()

enter image description here РЕДАКТИРОВАТЬ 1:

В соответствии с вашим требованием, пожалуйста, найдите приведенный ниже скрипт, который будет работать в соответствии с самой последней датой / временем для v_txn_id.

spark.sql("select distinct a.prod_code,a.bal,a.v_txn_id from aggregates a join (select distinct v_txn_id,max(timestamp) over(partition by v_txn_id) as temp_timestamp from aggregates) b on a.v_txn_id=b.v_txn_id and a.timestamp=b.temp_timestamp order by a.v_txn_id").show()

enter image description here

Пожалуйста, дайте мне знать, если у вас есть какие-либо вопросы, в противном случае отметьте этот ответ как принятый (отметьте значок).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...