Я использую Spring Boot 2.1.1.RELEASE и Spring Cloud Greenwich.RC2, а управляемая версия для spring-cloud-stream-binder-kafka - 2.1.0RC4. Версия Kafka - 1.1.0. Я установил следующие свойства, так как сообщения не должны использоваться, если есть ошибка.
spring.cloud.stream.bindings.input.group=consumer-gp-1
...
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOnError=false
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=false
spring.cloud.stream.bindings.input.consumer.max-attempts=3
spring.cloud.stream.bindings.input.consumer.back-off-initial-interval=1000
spring.cloud.stream.bindings.input.consumer.back-off-max-interval=3000
spring.cloud.stream.bindings.input.consumer.back-off-multiplier=2.0
....
В теме Kafka есть 20 разделов, и Kerberos используется для аутентификации (не уверен, что это актуально).
Потребитель Kafka вызывает веб-службу для каждого обрабатываемого сообщения, и если веб-служба недоступна, то я ожидаю, что потребитель затем попытается обработать сообщение 3 раза, прежде чем оно перейдет к следующему сообщению. Поэтому для моего теста я отключил веб-сервис, и поэтому ни одно из сообщений не могло быть обработано правильно. Из журналов видно, что это происходит.
Через некоторое время я остановил, а затем перезапустил потребителя Kafka (веб-сервис по-прежнему отключен). Я ожидал, что после перезапуска потребителя Kafka он попытается обработать сообщения, которые не были успешно обработаны в первый раз. Из журналов (я распечатывал каждое сообщение с его полями) после перезапуска Kafka Consumer я не мог видеть, что это происходит. Я думал, что раздел может влиять на что-то, но я проверяю журналы и все 20 разделов были назначены этому единственному потребителю.
Есть ли свойство, которое я пропустил? Я думал, что ожидаемое поведение, когда я перезапускаю потребителя во второй раз, заключается в том, что брокер Kafka передаст записи, которые не были успешно обработаны, потребителю снова.
Спасибо