Как агрегировать данные микропакетов в фрейм данных для структурированной потоковой передачи искр? - PullRequest
0 голосов
/ 14 июля 2020

Пример использования следующий. Данные кафки загружаются на основе структурированной потоковой передачи искры. Мы надеемся, что данные для каждой микропакции каждые 10 секунд можно будет обрабатывать и агрегировать в единый фрейм данных, который продолжает отслеживать сумму некоторых значений для каждого идентификатора. Правильно ли следующий способ? Похоже, что старые данные из предыдущих микропакетов все еще находятся в «таблице мониторинга» для каждого запроса, что вызывает нежелательное агрегирование. Как лучше всего решить эту проблему? Спасибо!

    val monitoring_stream = df.writeStream
                              .outputMode("append")
                              .format("memory")
                              .queryName("monitoring_table")
                              .start()

      var rounds = 0
      
      while(monitoring_stream.isActive) {
          Thread.sleep(10000)
  
          spark.sql("SELECT * from monitoring_table").show()   
          var tempDF = spark.sql("SELECT * from monitoring_table")
          var batchDF_group = tempDF.withWatermark("timestamp", "10 seconds").groupBy("id").sum("download_volume", "upload_volume").withColumnRenamed("sum(download_volume)","total_download_volume_batch").withColumnRenamed("sum(upload_volume)","total_upload_volume_batch")
          monitoring_df = monitoring_df.join(batchDF_group, monitoring_df("id") === batchDF_group("id"), "left").select(monitoring_df("id"), monitoring_df("total_download_volume"), monitoring_df("upload_volume"), monitoring_df("total_volume"), batchDF_group("total_download_volume_batch"), batchDF_group("total_upload_volume_batch")).na.fill(0)
          monitoring_df = monitoring_df.withColumn("total_upload_volume", monitoring_df("total_upload_volume")+monitoring_df("total_upload_volume_batch"))
          monitoring_df = monitoring_df.withColumn("total_download_volume", monitoring_df("total_download_volume")+monitoring_df("total_download_volume_batch"))
          monitoring_df = monitoring_df.withColumn("total_volume", monitoring_df("total_download_volume")+monitoring_df("total_upload_volume"))                                    

          
          monitoring_df.show()    
    }```

1 Ответ

0 голосов
/ 14 июля 2020

Я использую Databricks в течение некоторого времени, и одна из вещей, которая помогла мне в потоковой передаче, -

Databricks Delta Lake

Функция, которая делает Databricks Delta Lake актуальной здесь: - Он поддерживает «ровно один раз» обработку с более чем одним потоком (или одновременными пакетными заданиями).

Вы можете go с помощью приведенной выше документации, и Если не databricks delta lake, то вы можете включить некоторые из этих функций в свой проект.

...