Я работал над получением потока событий из Azure EventHubs, но я не хочу работать с потенциальными дубликатами, которые компонент посылает время от времени, я следовал за некоторыми примерами того, как дедуплицировать такие поток данных, записывающий foreachBatch в моем коде, который записывает в приемник.
Что я делаю в целом, так это:
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()
val schema = new StructType()
.add("col1", StringType)
.add("col2", StringType)
.add("coln", StringType)
val data = incomingStream
.select($"body".cast("string"))
.select(from_json($"body", schema) as "body")
.select($"body.*")
display(data)
val deltaTable = DeltaTable.forName("target_table")
def upsertToDelta(df: DataFrame, batchId: Long) {
// df.persist()
// df.write.format("delta").mode("append").save("log_table")
// Thread.sleep(500)
val unique_batch = df.groupBy($"col3", $"col4")
.agg(last("col_name_1").alias("col1"),
last("col_name_2").alias("col2"),
...)
deltaTable.as("sink")
.merge(
unique_batch.as("stream"),
"sink.col1 = stream.col1 AND sink.col2 = stream.col2")
.whenNotMatched().insertAll()
.execute()
//df.unpersist()
}
data
.writeStream
.foreachBatch(upsertToDelta _)
.outputMode("update")
.option("checkpointLocation", "/checkpoints/ingest")
.start()
В результате мои данные действительно дедуплицированы, но верно и то, что некоторые события никогда не вставляются в таблицу, я пытался опускание до 2 разных таблиц, чтобы проверить, приходили ли события в поток (с моими закомментированными строками кода постоянства и записи в дельта-таблицу), и когда я делаю, события фактически завершены, и таблица записывает себя отлично (почему?), но Я не понимаю причину, а также я не думаю, что необходимо писать вторичный приемник для дедуплицированных записей. Я попытался выяснить, сработало ли это, потому что моя строка кода, которая записывает в приемник, изменяет мой фрейм данных или причина связана со временем, которое требуется для написания пакета, что помогает правильно синхронизировать пакет, поэтому я попытался перевести в режим сна 2 секунд без записи в мою таблицу журналов, и это снова заработало! (но почему ?!) Из всего этого я понимаю, что я не понимаю какой-то фундаментальный принцип, лежащий в основе искры, потоковой передачи или фреймов данных, и что мне не хватает элементарного шага для правильного процесса. Если у кого-то есть какое-то понимание, оно будет принято счастливо.