Я работаю над вариантом использования удаления дубликатов записей из входящих структурированных данных (в виде файлов CSV в папке на HDFS).Чтобы попробовать этот вариант использования, я написал пример кода с использованием опции файлов, чтобы посмотреть, можно ли удалить дубликаты из записей, которые присутствуют в CSV, которые копируются в папку (HDFS).
Найдите под кодовым фрагментом:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
import spark.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
val userSchema = new StructType()
.add("prod_code", "string")
.add("bal", "integer")
.add("v_txn_id", "string")
.add("timestamp", "Timestamp")
val csvDF = spark.readStream.option("sep", ",")
.schema(userSchema)
.csv("/user/Temp")
csvDF.dropDuplicates("v_txn_id")
csvDF.createOrReplaceTempView("table1")
val dbDf2 = spark.sql("select prod_code, bal, v_txn_id, current_timestamp timestamp from table1")
dbDf2.writeStream.queryName("aggregates").outputMode("update").format("memory").start()
spark.sql("select * from aggregates").show();
Теперь, когда я копирую файл в папку с дублирующимися записями (по v_txn_id), я все еще вижу, что получатель получает все строки изфайл:
P1,1000,TXNID1
P1,2000,TXNID2
P1,3000,TXNID2
P1,4000,TXNID3
P1,5000,TXNID3
P1,6000,TXNID4
Все эти строки в CSV-файле перемещаются в результат "агрегаты".То, что я ожидаю, это:
P1,1000,TXNID1
P1,3000,TXNID2
P1,5000,TXNID3
P1,6000,TXNID4
Это первый раз, когда я пытаюсь структурированной потоковой передачи (с состоянием), так что простите меня за тривиальный вопрос.Любые предложения очень помогут.