Получение нулевых исключений или никаких действий при попытке использовать искру под foreachrdd и foreach.
Я понимаю, что искра может быть только от драйвера. Здесь я пытаюсь сгруппировать DStream на основе ключа (приводит к RDD ключа, list [Row]) и необходимо применить фильтр на основе динамических столбцов (пытаюсь создать искровой фрейм данных под каждым ключом со списком строк и используя spark sqlфильтровать / заказывать и применять лимит). Эта часть логики работает в режиме master local, но не в yarn.
Пытаясь найти другие варианты группировки, примените фильтр к каждой группе на основе динамических столбцов. Пожалуйста, дайте мне знать, если кто-то получил идею, не используя выше
grouped = candidateWithTreatment.map(row => (row.getString(2) + "-" + row.getString(4), List(row))).reduceByKey(_ ::: _)
grouped.foreachRDD(eachRDD => {
// create spark session or sqlcontext
eachRDD.foreach(eachRow => {
val rowsRDD = sqlContext.sparkContext.parallelize[Row](rows)
val eachDF = sqlContext.createDataFrame(rowsRDD, columnSchema)
eachModelDF.createOrReplaceTempView("table")
val filteredDF = sqlContext.sql("select from table based on my dynamically generated sql"))
.limit(fLimit.toInt).limit(sLimit.toInt)
})
})
Я не могу использовать потоковую структурированную искру, поскольку у нее есть проблемы с аутентификацией с kafka.