DStreams: переменная, созданная в foreachRDD и затем измененная внутри foreachPartition, сбрасывается один раз за пределы foreachPartition? - PullRequest
0 голосов
/ 25 июня 2019

У меня есть куча сообщений в kafka, и я использую потоковую обработку для обработки этих сообщений.

Я пытаюсь поймать, когда мой код не может быть вставлен в мою БД, а затем взять эти сообщения и вставить их обратно в Kafka, чтобы я мог обработать их позже.

Чтобы бороться с этим, я создаю переменную внутри моей функции foreachRDD, которая называется «success». Затем, когда я пытаюсь обновить БД, я возвращаю логическое значение для успешной вставки. Во время тестирования я заметил, что это не очень хорошо работает, когда я пытаюсь вставить его в foreachPartition. Кажется, что значение успеха «сбрасывается», когда я выхожу за пределы функции foreachPartition.

stream: DStream[String]

stream
  .foreachRDD(rdd => {
    if (!rdd.isEmpty()) {
      var success = true
      rdd.foreachPartition(partitionOfRecords => {
        if (partitionOfRecords.nonEmpty) {
          val listOfRecords = partitionOfRecords.toList
          val successfulInsert: Boolean = insertRecordsToDB(listOfRecords)
          logger.info("Insert was successful: " + successfulInsert)
          if (!successfulInsert) {
            logger.info("logging successful as false. Currently its set to: " + success )
            success = false
            logger.info("logged successful as false. Currently its set to: " + success )

          }
        }
      })

      logger.info("Insert into database successful from all partition: " + success)
      if (!success) {
        // send data to Kafka topic
      }

    }
  })

Выходные данные из моих журналов показывают это!

2019-06-24 20:26:37 [INFO] Вставка прошла успешно: false 2019-06-24 20:26:37 [INFO] регистрация успешна как ложная. В настоящее время установлено: true 2019-06-24 20:26:37 [INFO] зарегистрирован успешно как ложный. В настоящее время установлено: false 2019-06-24 20:26:37 [INFO] Вставить в базу данных успешно из всех разделов: true

Несмотря на то, что в 3-м журнале говорится, что в настоящее время для параметра "success" установлено значение false, тогда, когда я выхожу за пределы foreachPartition, я снова регистрирую его, и для него снова устанавливается значение true.

Может кто-нибудь объяснить, почему? Или предложить другой подход?

1 Ответ

0 голосов
/ 25 июня 2019

Я смог заставить это работать, используя аккумулятор.

stream: DStream[String]

val dbInsertACC = sparkSession.sparkContext.longAccumulator("insertSuccess")

stream
  .foreachRDD(rdd => {
    if (!rdd.isEmpty()) {
      //could maybe put accumulator here?
      rdd.foreachPartition(partitionOfRecords => {
        if (partitionOfRecords.nonEmpty) {
          val listOfRecords = partitionOfRecords.toList
          val successfulInsert: Boolean = insertRecordsToDB(listOfRecords)
          logger.info("Insert was successful: " + successfulInsert)
          if (!successfulInsert) dbInsertACC.add(1)
        }
      })

      logger.info("Insert into database successful from all partition: " + success)
      if (!dbInsertACC.isZero) {
        // send data to Kafka topic
      }

    }
  })
...