В своем приложении потоковой передачи с искровым кинезисом я использую foreachBatch для получения потоковых данных, и мне нужно отправить их в механизм правил drools для дальнейшей обработки.
Мое требование - мне нужно собрать все json данные в list / ruleSession и отправьте их в обработчик правил для обработки в виде пакета на стороне исполнителя.
//Scala Code Example:
val dataFrame = sparkSession.readStream
.format("kinesis")
.option("streamName", streamName)
.option("region", region)
.option("endpointUrl",endpointUrl)
.option("initialPosition", "TRIM_HORIZON")
.load()
val query = dataFrame
.selectExpr("CAST(data as STRING) as krecord")
.writeStream
.foreachBatch(function)
.start()
query.awaitTermination()
val function = (batchDF: DataFrame, batchId: Long) => {
val ruleSession = kBase.newKieSession() //Drools Rule Session, this is getting created at driver side
batchDF.foreach(row => { // This piece of code is being run in executor.
val jsonData: JSONData = jsonHandler.convertStringToJSONType(row.mkString)
ruleSession.insert(jsonData) // Getting a null pointer exception here as the ruleSession is not available in executor.
}
)
ruleHandler.processRule(ruleSession) // Again this is in the driver scope.
}
В приведенном выше коде проблема, с которой я сталкиваюсь, заключается в следующем: функция, используемая в foreachBatch, получает выполняется на стороне драйвера, а код внутри batchDF.foreach выполняется на стороне работника / исполнителя, и, таким образом, не удается получить его ruleSession.
Есть ли способ запустить всю функцию на каждой стороне исполнителя?
ИЛИ
Есть ли лучший способ собрать все данные в пакете DataFrame после преобразования и отправить его следующему процессу из исполнителя / работника?