Мы храним смещение кафки в БД для контрольных точек.Это помогает в нулевой потере сообщений при перезапуске нашего приложения.
Существует сценарий, когда при перезапуске приложения spark смещение не отображается в кафке (из-за того, что оно было стерто из-за обновления или запуска в докере).В этом сценарии искровое приложение выдает ошибку
java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
Мы бы хотели перезапустить приложение и в этом случае прочитать с последнего смещения.
try{
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
ssc, kafkaParams, fromOffset, msgHandler)
} catch {case ex: Exception => {
KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
}
}
Добавление try / catch и запуск с последнего смещенияздесь не помогаетТак как ошибка происходит у исполнителя.Есть ли способ справиться с этим из драйвера?