У меня есть куча сообщений в kafka, и я использую потоковую обработку для обработки этих сообщений.
Я пытаюсь поймать, когда мой код не может быть вставлен в мою БД, а затем взять эти сообщения и вставить их обратно в Kafka, чтобы я мог обработать их позже.
Чтобы бороться с этим, я создаю переменную внутри моей функции foreachRDD, которая называется «success». Затем, когда я пытаюсь обновить БД, я возвращаю логическое значение для успешной вставки. Во время тестирования я заметил, что это не очень хорошо работает, когда я пытаюсь вставить его в foreachPartition. Кажется, что значение успеха «сбрасывается», когда я выхожу за пределы функции foreachPartition.
stream: DStream[String]
stream
.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
var success = true
rdd.foreachPartition(partitionOfRecords => {
if (partitionOfRecords.nonEmpty) {
val listOfRecords = partitionOfRecords.toList
val successfulInsert: Boolean = insertRecordsToDB(listOfRecords)
logger.info("Insert was successful: " + successfulInsert)
if (!successfulInsert) {
logger.info("logging successful as false. Currently its set to: " + success )
success = false
logger.info("logged successful as false. Currently its set to: " + success )
}
}
})
logger.info("Insert into database successful from all partition: " + success)
if (!success) {
// send data to Kafka topic
}
}
})
Выходные данные из моих журналов показывают это!
2019-06-24 20:26:37 [INFO] Вставка прошла успешно: false
2019-06-24 20:26:37 [INFO] регистрация успешна как ложная. В настоящее время установлено: true
2019-06-24 20:26:37 [INFO] зарегистрирован успешно как ложный. В настоящее время установлено: false
2019-06-24 20:26:37 [INFO] Вставить в базу данных успешно из всех разделов: true
Несмотря на то, что в 3-м журнале говорится, что в настоящее время для параметра "success" установлено значение false, тогда, когда я выхожу за пределы foreachPartition, я снова регистрирую его, и для него снова устанавливается значение true.
Может кто-нибудь объяснить, почему? Или предложить другой подход?