Кафка незафиксированное сообщение больше не потребляется - PullRequest
0 голосов
/ 27 марта 2019

Я обрабатываю сообщения kafka и вставляю их в таблицу kudu, используя потоковую обработку с использованием ручной фиксации смещения, вот мой код.

val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
  ConsumerConfig.GROUP_ID_CONFIG -> groupId,
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean),
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest" //"latest" //"earliest"
 )
val stream = KafkaUtils.createDirectStream[String, String](
                        ssc,
                        PreferConsistent,
                        Subscribe[String, String](topicsSet, kafkaParams)
                       )
stream.foreachRDD { rdd =>
var offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
//offsetRanges.foreach(println)
var msgOffsetsRdd = rdd.map(msg =>{
val msgOffset = OffsetRange(msg.topic(), msg.partition(),  msg.offset(), msg.offset()+1)
        println(msg)
        msgOffset 
      }
    )
   val msgOffsets = msgOffsetsRdd.collect() //here idea was to get only processed messages offsets for commit
   stream.asInstanceOf[CanCommitOffsets].commitAsync(msgOffsets)
}

Давайте приведем этот пример во время вставки данных в kudu, я получил нужную мне ошибкучтобы обработать эти сообщения еще раз, если я остановлю работу и начну ее снова, я смогу получить незафиксированное сообщение, разве мы не можем получить все незафиксированные сообщения в потоке?

1 Ответ

0 голосов
/ 27 марта 2019

У вас есть сообщение, почему бы не поставить логику повтора в случае сбоя. Kafka выдаст вам то же сообщение при повторном подключении в случае сбоя вашего потребителя. Не уверен, что Kafka выдаст то же сообщение, пока соединение еще открыто.

Вы можете иметь некоторую логику повторения в вашем коде, если сбой вызван недоступностью целевого хранилища данных, или если вставить ошибочный из-за неправильного формата сообщения, вы можете сохранить эти сообщения во временном кэше, хранилище данных или другой теме кафки, чтобы повторить попытку позже или изучите, что не так с этими сообщениями.

...