Как описано в документации, смещение должно быть зафиксировано только тогда, когда я фактически фиксирую (когда AckMode.MANUAL_IMMEDIATE или AckMode.MANUAL) или в конце выполнения слушателя, когда AckMode.RECORD, однако, находится в середине обработки метод, аннотированный @KafkaListener, приложение закрывается, сообщение не доставляется повторно, приложение начинает чтение со следующего действительного сообщения, и это текущее сообщение теряется (сообщение, которое обрабатывалось при перезапуске приложения), как мне добиться цель повторной обработки приложения незафиксированного сообщения, когда приложение перезапускается в середине обработки? Я также попытался настроить AUTO_OFFSET_RESET_CONFIG как самый ранний, последний и ни один без успеха в 3 моделях. В целях тестирования я создал topi c только с одним разделом и заставил слушателя использовать фабрику контейнеров, которую я определяю вручную. Springboot-версия 2.2.6
@Configuration
class KafkaTestConfiguration {
@Bean
fun producerFactory(): ProducerFactory<String, String> {
return DefaultKafkaProducerFactory(producerConfigs())
}
@Bean
fun consumerFactory(): ConsumerFactory<Any, Any> {
return DefaultKafkaConsumerFactory(consumerConfigs())
}
@Bean
fun producerConfigs(): Map<String, Any> {
val props: MutableMap<String, Any> = HashMap()
props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9094"
props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return props
}
@Bean
fun consumerConfigs(): Map<String, Any> {
val props: MutableMap<String, Any> = HashMap()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9094"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 20000
props[ConsumerConfig.GROUP_ID_CONFIG] = "kafka-retry"
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
return props
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<Any, Any> {
val factory: ConcurrentKafkaListenerContainerFactory<Any, Any> = ConcurrentKafkaListenerContainerFactory()
factory.consumerFactory = consumerFactory()
factory.consumerFactory.createConsumer()
val containerProperties = factory.containerProperties
containerProperties.isAckOnError = false
containerProperties.ackMode = AckMode.MANUAL_IMMEDIATE
containerProperties.commitLogLevel = LogIfLevelEnabled.Level.INFO
containerProperties.isLogContainerConfig = true
return factory
}
@Component
class KafkaListenerAck {
@KafkaListener(id = "listMsgAckConsumer", topics = ["kafkaListenerTest1"],
groupId = "kafka-retry",
concurrency = "1",
containerFactory = "kafkaListenerContainerFactory"
)
fun onMessage(data: ConsumerRecord<String, String>, acknowledgment: Acknowledgment?) {
println("listMsgAckConsumer1 - topic ${data.topic()} offset ${data.offset()} partition ${data.partition()} message ${data.value()}")
println("If stop container here, the next pool will not deliver the current unconfirmed message")
acknowledgment?.acknowledge()
}
}