Spring Kafka не сбрасывает коммит сброса при работе приложения - PullRequest
0 голосов
/ 15 апреля 2020

Как описано в документации, смещение должно быть зафиксировано только тогда, когда я фактически фиксирую (когда AckMode.MANUAL_IMMEDIATE или AckMode.MANUAL) или в конце выполнения слушателя, когда AckMode.RECORD, однако, находится в середине обработки метод, аннотированный @KafkaListener, приложение закрывается, сообщение не доставляется повторно, приложение начинает чтение со следующего действительного сообщения, и это текущее сообщение теряется (сообщение, которое обрабатывалось при перезапуске приложения), как мне добиться цель повторной обработки приложения незафиксированного сообщения, когда приложение перезапускается в середине обработки? Я также попытался настроить AUTO_OFFSET_RESET_CONFIG как самый ранний, последний и ни один без успеха в 3 моделях. В целях тестирования я создал topi c только с одним разделом и заставил слушателя использовать фабрику контейнеров, которую я определяю вручную. Springboot-версия 2.2.6

    @Configuration
    class KafkaTestConfiguration {

        @Bean
        fun producerFactory(): ProducerFactory<String, String> {
            return DefaultKafkaProducerFactory(producerConfigs())
        }

        @Bean
        fun consumerFactory(): ConsumerFactory<Any, Any> {
            return DefaultKafkaConsumerFactory(consumerConfigs())
        }

        @Bean
        fun producerConfigs(): Map<String, Any> {
            val props: MutableMap<String, Any> = HashMap()
            props[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9094"
            props[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
            props[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
            return props
        }

        @Bean
        fun consumerConfigs(): Map<String, Any> {
            val props: MutableMap<String, Any> = HashMap()
            props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9094"
            props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
            props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
            props[ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG] = 20000
            props[ConsumerConfig.GROUP_ID_CONFIG] = "kafka-retry"
            props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
            props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = "false"
            return props
        }

        @Bean
        fun kafkaTemplate(): KafkaTemplate<String, String> {
            return KafkaTemplate(producerFactory())
        }

  @Bean
    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<Any, Any> {
        val factory: ConcurrentKafkaListenerContainerFactory<Any, Any> = ConcurrentKafkaListenerContainerFactory()
        factory.consumerFactory = consumerFactory()
        factory.consumerFactory.createConsumer()
        val containerProperties = factory.containerProperties
        containerProperties.isAckOnError = false
        containerProperties.ackMode = AckMode.MANUAL_IMMEDIATE
        containerProperties.commitLogLevel = LogIfLevelEnabled.Level.INFO
        containerProperties.isLogContainerConfig = true
        return factory
    }

    @Component
    class KafkaListenerAck {
        @KafkaListener(id = "listMsgAckConsumer", topics = ["kafkaListenerTest1"],
                groupId = "kafka-retry",
                concurrency = "1",
                containerFactory = "kafkaListenerContainerFactory"
        )
        fun onMessage(data: ConsumerRecord<String, String>, acknowledgment: Acknowledgment?) {
            println("listMsgAckConsumer1 - topic ${data.topic()} offset ${data.offset()} partition ${data.partition()} message ${data.value()}")
            println("If stop container here, the next pool will not deliver the current unconfirmed message")
            acknowledgment?.acknowledge()
        }
    }

1 Ответ

1 голос
/ 15 апреля 2020

Смещение не будет зафиксировано, пока не будет вызван acknowledgment.acknowledge(). Установите для свойства контейнера commitLogLevel значение DEBUG, чтобы увидеть действие фиксации.

auto.offset.reset применяется только в том случае, если потребитель никогда не совершал смещение (только для новых групп потребителей).

Если вы не могу понять это из журнала; отредактируйте вопрос с помощью фрагмента журнала.

...