Я смотрю на сервис весенней загрузки, который читает сообщения от apache kafka, запрашивает записи, указанные в сообщении, из другого сервиса через http, обрабатывает их, сохраняет некоторые данные в базу данных и публикует результаты в другой теме.
это делается через
@StreamListener(Some.INPUT)
@SendTo(Some.OUTPUT)
это делается в нескольких сервисах и в целом работает просто отлично.Единственный набор свойств
spring.cloud.stream.binder.consumer.concurrency=20
сама тема имеет 20 разделов, которые должны соответствовать.
При мониторинге чтений из kafka мы обнаружили очень низкую пропускную способность и странное поведение:
приложение считывает до 500 сообщений одновременно, затем 1-2 минуты ничего.в течение этого времени потребитель неоднократно регистрирует, что он «пропускает тактовые импульсы, потому что раздел был перебалансирован», «переназначает разделы», а иногда даже выдает исключение, говоря, что «не удалось зафиксировать, потому что интервал опроса истек»
Мы пришли к выводу, что это означает, что потребитель получает 500 сообщений, занимает много времени, чтобы обработать все из них, пропускает свое временное окно и, следовательно, не может передать ни одно из 500 сообщений посреднику - который переназначает раздел и повторно отправляетснова и снова одни и те же сообщения.
После просмотра потоков и документов я обнаружил свойство "max.poll.records", но в качестве места для установки этого свойства я нашел противоречивые предложения.
некоторые говорят, чтоустановите его в
spring.cloud.stream.bindings.consumer.<input>.configuration
некоторые говорят
spring.cloud.stream.kafka.binders.consumer-properties
Я попытался установить оба значения в 1, но поведение служб не изменилось.
Как правильно обрабатыватьв случае, если потребитель не может идти в ногу с требуемым интервалом опроса с настройками по умолчанию?
common-yaml:
spring.cloud.stream.default.group=${spring.application.name}
service-yaml
spring:
clould:
stream:
default:
consumer.headerMode: embeddedHeaders
producer.headerMode: embeddedHeaders
bindings:
someOutput:
destination: outTopic
someInput:
destination: inTopic
consumer:
concurrency: 30
kafka:
bindings:
consumer:
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this
someInput:
configuarion:
max.poll.records: 20 # ConsumerConfig ignores this
consumer:
enableDlq: true
configuarion:
max.poll.records: 30 # ConsumerConfig ignores this
binder:
consumer-properties:
max.poll.records: 10 # this gets used first
configuration:
max.poll.records: 40 # this get used when the first one is not present
Всегда игнорируется это значение, если не установлено другое свойство, ConsumerConfiguration сохраняет значение по умолчанию 500 для записей максимального опроса
РЕДАКТИРОВАТЬ .: мы стали ближе:
Проблема была связана с пружинным повтором, имеющим набор exponentialBackoffStrategy - и кучей ошибок, которые эффективно останавливали приложение.
Чтоя не понимаю, мы вынудили 200 ошибок, отправив искаженные сообщения в рассматриваемую тему, что приводит к чтению приложения 200, возрасту (со старой конфигурацией повторных попыток) и последующей фиксации всех 200 ошибок одновременно.
Как это имеет смысл, если у нас есть
max.poll.records: 1
concurrency: 1
ackEachRecod = true
enableDlq: true # (which implicitly makes autoCommitOffsets = true according to the docs)