Я использую пружинную кафку 1.2.2. ВЫПУСК.В настоящее время я настроил шаблон повтора для контейнера, который не имеет BackOffPolicy и AlwaysRetryPolicy.Режим подтверждения - MANUAL_IMMEDIATE.
Когда SIGTERM, я разрешу обрабатывать текущее сообщение, и когда @KafkaListener будет вызван снова с новым сообщением, я выбрасываю RuntimeException для контейнера, который повторяется бесконечно и выдает исключение непрерывно.И через некоторое время выдается SIGKILL, и контейнер останавливается (я думаю, что есть лучший способ сделать это).Но с этим процессом сообщение, которое было повторено, извлекается после перезапуска потребителем, но передается без вызова KafkaListener.См. Смещение = 13 в приведенной ниже трассировке стека
Трассировка стека:
[20 May 2018 22:37:20] [ INFO] [] [ConsumerCoordinator onJoinComplete]:[262 ] - Setting newly assigned partitions [messages-0] for group listener
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer$2 onPartitionsAssigned]:[513 ] - Committing on assignment: {messages-0=OffsetAndMetadata{offset=13, metadata=''}}
[20 May 2018 22:37:20] [ INFO] [] [AbstractMessageListenerContainer$2 onPartitionsAssigned]:[278 ] - partitions assigned:[messages-0]
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer run]:[632 ] - Received: 0 records
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer run]:[632 ] - Received: 1 records
[20 May 2018 22:37:20] [TRACE] [] [KafkaMessageListenerContainer$ListenerConsumer doInvokeWithRecords]:[931 ] - Processing ConsumerRecord(topic = messages, partition = 0, offset = 13, CreateTime = 1526855737241, serialized key size = 31, serialized value size = 2032, headers = RecordHeaders(headers = [], isReadOnly = false), key = "some key", value = "some random data")
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer ackImmediate]:[749 ] - Committing: {messages-0=OffsetAndMetadata{offset=14, metadata=''}
И есть ли лучший способ остановить контейнер, когда я вижу, что выдается SIGTERM, чтобы @KafkaListener не вызывался с сообщениями.Я знаю, что в более поздних версиях (> 2.0.0) spring-kafka есть KafkaListenerEndpointRegistry, который может остановить всех потребителей.Но в настоящее время невозможно обновить до 2.0.0.
Любая помощь очень ценится.