Дедуплицирующий поток теряет события Spark, Eventhubs в Databricks - PullRequest
0 голосов
/ 26 марта 2020

Я работал над получением потока событий из Azure EventHubs, но я не хочу работать с потенциальными дубликатами, которые компонент посылает время от времени, я следовал за некоторыми примерами того, как дедуплицировать такие поток данных, записывающий foreachBatch в моем коде, который записывает в приемник.

Что я делаю в целом, так это:

val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()

val schema = new StructType() 
    .add("col1", StringType)
    .add("col2", StringType)
    .add("coln", StringType)


val data = incomingStream
    .select($"body".cast("string"))
    .select(from_json($"body", schema) as "body")
    .select($"body.*")

display(data)


val deltaTable = DeltaTable.forName("target_table")

def upsertToDelta(df: DataFrame, batchId: Long) {
  //   df.persist()

  //   df.write.format("delta").mode("append").save("log_table")
  //   Thread.sleep(500)
  val unique_batch = df.groupBy($"col3", $"col4") 
                    .agg(last("col_name_1").alias("col1"),
                         last("col_name_2").alias("col2"),
                         ...)

  deltaTable.as("sink")
    .merge(
      unique_batch.as("stream"),
      "sink.col1 = stream.col1 AND sink.col2 = stream.col2")
    .whenNotMatched().insertAll()
    .execute()

  //df.unpersist()

}


data
  .writeStream
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .option("checkpointLocation", "/checkpoints/ingest")
  .start()

В результате мои данные действительно дедуплицированы, но верно и то, что некоторые события никогда не вставляются в таблицу, я пытался опускание до 2 разных таблиц, чтобы проверить, приходили ли события в поток (с моими закомментированными строками кода постоянства и записи в дельта-таблицу), и когда я делаю, события фактически завершены, и таблица записывает себя отлично (почему?), но Я не понимаю причину, а также я не думаю, что необходимо писать вторичный приемник для дедуплицированных записей. Я попытался выяснить, сработало ли это, потому что моя строка кода, которая записывает в приемник, изменяет мой фрейм данных или причина связана со временем, которое требуется для написания пакета, что помогает правильно синхронизировать пакет, поэтому я попытался перевести в режим сна 2 секунд без записи в мою таблицу журналов, и это снова заработало! (но почему ?!) Из всего этого я понимаю, что я не понимаю какой-то фундаментальный принцип, лежащий в основе искры, потоковой передачи или фреймов данных, и что мне не хватает элементарного шага для правильного процесса. Если у кого-то есть какое-то понимание, оно будет принято счастливо.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...