пружина кафка 1.2.2 грациозное отключение - PullRequest
0 голосов
/ 20 мая 2018

Я использую пружинную кафку 1.2.2. ВЫПУСК.В настоящее время я настроил шаблон повтора для контейнера, который не имеет BackOffPolicy и AlwaysRetryPolicy.Режим подтверждения - MANUAL_IMMEDIATE.

Когда SIGTERM, я разрешу обрабатывать текущее сообщение, и когда @KafkaListener будет вызван снова с новым сообщением, я выбрасываю RuntimeException для контейнера, который повторяется бесконечно и выдает исключение непрерывно.И через некоторое время выдается SIGKILL, и контейнер останавливается (я думаю, что есть лучший способ сделать это).Но с этим процессом сообщение, которое было повторено, извлекается после перезапуска потребителем, но передается без вызова KafkaListener.См. Смещение = 13 в приведенной ниже трассировке стека

Трассировка стека:

[20 May 2018 22:37:20] [ INFO] [] [ConsumerCoordinator  onJoinComplete]:[262 ] - Setting newly assigned partitions [messages-0] for group listener
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer$2 onPartitionsAssigned]:[513 ] - Committing on assignment: {messages-0=OffsetAndMetadata{offset=13, metadata=''}}
[20 May 2018 22:37:20] [ INFO] [] [AbstractMessageListenerContainer$2 onPartitionsAssigned]:[278 ] - partitions assigned:[messages-0]
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer        run]:[632 ] - Received: 0 records
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer        run]:[632 ] - Received: 1 records
[20 May 2018 22:37:20] [TRACE] [] [KafkaMessageListenerContainer$ListenerConsumer doInvokeWithRecords]:[931 ] - Processing ConsumerRecord(topic = messages, partition = 0, offset = 13, CreateTime = 1526855737241, serialized key size = 31, serialized value size = 2032, headers = RecordHeaders(headers = [], isReadOnly = false), key = "some key", value = "some random data")
[20 May 2018 22:37:20] [DEBUG] [] [KafkaMessageListenerContainer$ListenerConsumer ackImmediate]:[749 ] - Committing: {messages-0=OffsetAndMetadata{offset=14, metadata=''}

И есть ли лучший способ остановить контейнер, когда я вижу, что выдается SIGTERM, чтобы @KafkaListener не вызывался с сообщениями.Я знаю, что в более поздних версиях (> 2.0.0) spring-kafka есть KafkaListenerEndpointRegistry, который может остановить всех потребителей.Но в настоящее время невозможно обновить до 2.0.0.

Любая помощь очень ценится.

1 Ответ

0 голосов
/ 20 мая 2018

Реестр конечных точек существует с 1.0.1.x пользователям рекомендуется обновить до последней версии 1.3.x;см. здесь .

При остановке контейнера у слушателя лучше сделать это в новом потоке, иначе остановка будет отложена.

См. 2.1.x ContainerStoppingErrorHandler о том, как это сделать.Но, конечно, вам не нужно вызывать исключение после остановки.

Но с 1.x вам нужно будет отбрасывать любые последующие сообщения, которые уже были извлечены.

...