Spark Streaming - не удалось использовать Spark для группирования, а затем отфильтровать с помощью sparksql под каждой группой. - PullRequest
0 голосов
/ 02 октября 2019

Получение нулевых исключений или никаких действий при попытке использовать искру под foreachrdd и foreach.

Я понимаю, что искра может быть только от драйвера. Здесь я пытаюсь сгруппировать DStream на основе ключа (приводит к RDD ключа, list [Row]) и необходимо применить фильтр на основе динамических столбцов (пытаюсь создать искровой фрейм данных под каждым ключом со списком строк и используя spark sqlфильтровать / заказывать и применять лимит). Эта часть логики работает в режиме master local, но не в yarn.

Пытаясь найти другие варианты группировки, примените фильтр к каждой группе на основе динамических столбцов. Пожалуйста, дайте мне знать, если кто-то получил идею, не используя выше

 grouped = candidateWithTreatment.map(row => (row.getString(2) + "-" + row.getString(4), List(row))).reduceByKey(_ ::: _)


      grouped.foreachRDD(eachRDD => {     
         //    create spark session or sqlcontext
              eachRDD.foreach(eachRow => {        

     val rowsRDD = sqlContext.sparkContext.parallelize[Row](rows)
        val eachDF = sqlContext.createDataFrame(rowsRDD, columnSchema)
        eachModelDF.createOrReplaceTempView("table")

        val filteredDF = sqlContext.sql("select from table based on my dynamically generated sql"))
          .limit(fLimit.toInt).limit(sLimit.toInt)
          })
          })

Я не могу использовать потоковую структурированную искру, поскольку у нее есть проблемы с аутентификацией с kafka.

...