Как собрать все записи в spark executor и обработать как пакет - PullRequest
1 голос
/ 05 февраля 2020

В своем приложении потоковой передачи с искровым кинезисом я использую foreachBatch для получения потоковых данных, и мне нужно отправить их в механизм правил drools для дальнейшей обработки.

Мое требование - мне нужно собрать все json данные в list / ruleSession и отправьте их в обработчик правил для обработки в виде пакета на стороне исполнителя.

//Scala Code Example:
    val dataFrame = sparkSession.readStream
          .format("kinesis")
          .option("streamName", streamName)
          .option("region", region)
          .option("endpointUrl",endpointUrl)
          .option("initialPosition", "TRIM_HORIZON")
          .load()


    val query = dataFrame
        .selectExpr("CAST(data as STRING) as krecord")
        .writeStream
        .foreachBatch(function)
        .start()

    query.awaitTermination()


     val function = (batchDF: DataFrame, batchId: Long) => {
       val ruleSession = kBase.newKieSession() //Drools Rule Session, this is getting created at driver side

         batchDF.foreach(row => { // This piece of code is being run in executor.
               val jsonData: JSONData = jsonHandler.convertStringToJSONType(row.mkString)
               ruleSession.insert(jsonData) // Getting a null pointer exception here as the ruleSession is not available in executor.
             }
           )

       ruleHandler.processRule(ruleSession) // Again this is in the driver scope.
      }

В приведенном выше коде проблема, с которой я сталкиваюсь, заключается в следующем: функция, используемая в foreachBatch, получает выполняется на стороне драйвера, а код внутри batchDF.foreach выполняется на стороне работника / исполнителя, и, таким образом, не удается получить его ruleSession.

Есть ли способ запустить всю функцию на каждой стороне исполнителя?

ИЛИ

Есть ли лучший способ собрать все данные в пакете DataFrame после преобразования и отправить его следующему процессу из исполнителя / работника?

Ответы [ 2 ]

0 голосов
/ 14 февраля 2020

@ codeaperature, Это то, как я получил пакет, вдохновленный вашим ответом, разместив его как ответ, так как это превышает ограничение по количеству слов в комментарии.

  • Использование foreach на фрейме данных и передача в ForeachWriter.
  • Инициализация сеанса правила в открытом методе ForeachWriter.
  • Добавление каждого ввода JSON в сеанс правила в методе процесса.
  • Выполнить правило в методе close с сеансом правила, загруженным пакетом данных.

// Scala код:

val dataFrame = sparkSession.readStream
          .format("kinesis")
          .option("streamName", streamName)
          .option("region", region)
          .option("endpointUrl",endpointUrl)
          .option("initialPosition", "TRIM_HORIZON")
          .load()

val query = dataFrame
  .selectExpr("CAST(data as STRING) as krecord")
  .writeStream
  .foreach(dataConsumer)
  .start()

val dataConsumer = new ForeachWriter[Row] {

  var ruleSession: KieSession = null;

  def open(partitionId: Long, version: Long): Boolean = { // first open is called once for every batch
    ruleSession = kBase.newKieSession()
    true
  }

  def process(row: Row) = { // the process method will be called for a batch of records
    val jsonData: JSONData = jsonHandler.convertStringToJSONType(row.mkString)
    ruleSession.insert(jsonData) // Add all input json to rule session.
  }

  def close(errorOrNull: Throwable): Unit = { // after calling process for all records in bathc close is called
    val factCount = ruleSession.getFactCount
    if (factCount > 0) {
      ruleHandler.processRule(ruleSession) //batch processing of rule
    }
  }
}
0 голосов
/ 12 февраля 2020

Я думаю, это может сработать ... Вместо того, чтобы запускать foreach, вы можете использовать foreachBatch или foreachPartition (или версию карты, например mapPartition, если вы хотите получить информацию о возврате). В этой части откройте соединение с системой drools. С этого момента перебирайте набор данных в каждом разделе (или пакете), отправляя каждый из них в систему drools (или вы можете отправить весь этот кусок в drools). В конце раздела foreachPartition / foreachBatch закройте соединение (если применимо).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...