Перезапуск кафки искры при отсутствии смещения в пределах диапазона - PullRequest
0 голосов
/ 07 июня 2018

Мы храним смещение кафки в БД для контрольных точек.Это помогает в нулевой потере сообщений при перезапуске нашего приложения.

Существует сценарий, когда при перезапуске приложения spark смещение не отображается в кафке (из-за того, что оно было стерто из-за обновления или запуска в докере).В этом сценарии искровое приложение выдает ошибку

 java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative

Мы бы хотели перезапустить приложение и в этом случае прочитать с последнего смещения.

try{
 KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
    ssc, kafkaParams, fromOffset, msgHandler)
} catch {case ex: Exception => {
KafkaUtils.createDirectStream[String, String,
    StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
 }
}

Добавление try / catch и запуск с последнего смещенияздесь не помогаетТак как ошибка происходит у исполнителя.Есть ли способ справиться с этим из драйвера?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...