Я обрабатываю сообщения kafka и вставляю их в таблицу kudu, используя потоковую обработку с использованием ручной фиксации смещения, вот мой код.
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" //"latest" //"earliest"
)
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicsSet, kafkaParams)
)
stream.foreachRDD { rdd =>
var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//offsetRanges.foreach(println)
var msgOffsetsRdd = rdd.map(msg =>{
val msgOffset = OffsetRange(msg.topic(), msg.partition(), msg.offset(), msg.offset()+1)
println(msg)
msgOffset
}
)
val msgOffsets = msgOffsetsRdd.collect() //here idea was to get only processed messages offsets for commit
stream.asInstanceOf[CanCommitOffsets].commitAsync(msgOffsets)
}
Давайте приведем этот пример во время вставки данных в kudu, я получил нужную мне ошибкучтобы обработать эти сообщения еще раз, если я остановлю работу и начну ее снова, я смогу получить незафиксированное сообщение, разве мы не можем получить все незафиксированные сообщения в потоке?