Kafka Java Consumer не получает сообщения с того места, где он был остановлен в последний раз - PullRequest
0 голосов
/ 28 августа 2018

Я использую org.apache.kafka.clients.consumer.KafkaConsumer в своем приложении для обработки сообщений от Kafka. Я заметил следующее поведение в моем приложении. Если я убью свое приложение, а затем перезапущу его, все сообщения, которые производитель kafka отправляет между этими временными интервалами, не будут восприняты потребителем kafka в моем приложении при повторном запуске приложения. Вот как я настраиваю своего потребителя кафки в моем приложении

  val props = Map[String, Object](
  ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> EnvironmentConfig.getKafkaBootStrapServers,
  ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
  ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
  ConsumerConfig.GROUP_ID_CONFIG -> "myGrouop",
  ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean))

// Create the consumer using props.
val consumer = new KafkaConsumer[String, String](props)

Чего мне не хватает в моей конфигурации? Пожалуйста, помогите!

1 Ответ

0 голосов
/ 28 августа 2018

После перезапуска потребитель будет пытаться перезапустить с последнего зафиксированного смещения. Если он не найден, по умолчанию он перезапускается с конца.

Я вижу, что вы отключили автоматическую фиксацию (ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean))). В этом случае вы вносите коррективы вручную?

Если нет, то это объясняет, почему ваш потребитель перезагружается с конца. Он не зафиксировал никаких смещений ранее, поэтому по умолчанию latest.

Для начала советую оставить авто-фиксацию включенной. После того, как вы выяснили, как это работает, возможно, при необходимости посмотрите на внесение коррекций вручную.

...