Я использую org.apache.kafka.clients.consumer.KafkaConsumer
в своем приложении для обработки сообщений от Kafka. Я заметил следующее поведение в моем приложении. Если я убью свое приложение, а затем перезапущу его, все сообщения, которые производитель kafka отправляет между этими временными интервалами, не будут восприняты потребителем kafka в моем приложении при повторном запуске приложения. Вот как я настраиваю своего потребителя кафки в моем приложении
val props = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> EnvironmentConfig.getKafkaBootStrapServers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
ConsumerConfig.GROUP_ID_CONFIG -> "myGrouop",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean))
// Create the consumer using props.
val consumer = new KafkaConsumer[String, String](props)
Чего мне не хватает в моей конфигурации? Пожалуйста, помогите!