Я новичок в Kafka, и у меня есть потребитель Kafka, реализованный с использованием библиотеки Java Apache Camel.Проблема, которую я обнаружил, - Потребитель занимает много времени (> 15 минут) для обработки нескольких сообщений, что вполне подходит для нашего варианта использования.
Нужна помощь по настройке, поскольку то же сообщение повторно отправляется после15 минут, если не обрабатываются в течение 15 минут (контроль потока не возвращает, я считаю).Я думаю, что это может быть интервал по умолчанию, но я не уверен, какое это свойство.
Итак, где я должен исправить конфигурацию
- Уровень производителя, чтобы он не пересылался
- Или Производитель не участвует в повторной отправке, это сервер Broker - Kafka, поэтому Потребитель должен подтвердить сообщение - в моем случае непосредственно перед обработкой.
Мой производитель обладает этими свойствами:
<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
Мои потребительские конфиги выглядят так:
<endpoint id="apolloKafkaJanitorEventListenerURI"
uri="kafka:${kafka.bootstrap.servers}?topic=${apollo.janitor.event.topic}&
groupId=${apollo.janitor.event.group.id}&
consumersCount=${apollo.janitor.event.consumer.count}&
consumerRequestTimeoutMs=${eventConsumerRequestTimeoutMs}&
sessionTimeoutMs=${eventConsumerSessionTimeoutMs}&
maxPartitionFetchBytes=${eventConsumerMaxPartitionFetchBytes}" />
Я гуглил, не нашел никаких проблем.Найдено свойство "acks = 0" на Producer и для Consumer следующее.Не проверял, но хочу узнать, нахожусь ли я на правильном пути первым
KafkaManualCommit manual =
exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
manual.commitSync();
https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/docs/kafka-component.adoc