Kafka Consumer получает такое же сообщение - PullRequest
0 голосов
/ 13 декабря 2018

Я новичок в Kafka, и у меня есть потребитель Kafka, реализованный с использованием библиотеки Java Apache Camel.Проблема, которую я обнаружил, - Потребитель занимает много времени (> 15 минут) для обработки нескольких сообщений, что вполне подходит для нашего варианта использования.

Нужна помощь по настройке, поскольку то же сообщение повторно отправляется после15 минут, если не обрабатываются в течение 15 минут (контроль потока не возвращает, я считаю).Я думаю, что это может быть интервал по умолчанию, но я не уверен, какое это свойство.

Итак, где я должен исправить конфигурацию

  • Уровень производителя, чтобы он не пересылался
  • Или Производитель не участвует в повторной отправке, это сервер Broker - Kafka, поэтому Потребитель должен подтвердить сообщение - в моем случае непосредственно перед обработкой.

Мой производитель обладает этими свойствами:

<entry key="bootstrap.servers" value="${kafka.bootstrap.servers}" />
       <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
       <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />

Мои потребительские конфиги выглядят так:

<endpoint id="apolloKafkaJanitorEventListenerURI"
            uri="kafka:${kafka.bootstrap.servers}?topic=${apollo.janitor.event.topic}&amp;
                                                        groupId=${apollo.janitor.event.group.id}&amp;
                                                        consumersCount=${apollo.janitor.event.consumer.count}&amp;
                                                        consumerRequestTimeoutMs=${eventConsumerRequestTimeoutMs}&amp;
                                                        sessionTimeoutMs=${eventConsumerSessionTimeoutMs}&amp;
                                                        maxPartitionFetchBytes=${eventConsumerMaxPartitionFetchBytes}" />

Я гуглил, не нашел никаких проблем.Найдено свойство "acks = 0" на Producer и для Consumer следующее.Не проверял, но хочу узнать, нахожусь ли я на правильном пути первым

KafkaManualCommit manual =
       exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
   manual.commitSync();

https://github.com/apache/camel/blob/master/components/camel-kafka/src/main/docs/kafka-component.adoc

Ответы [ 2 ]

0 голосов
/ 14 декабря 2018

Вы пытаетесь избежать нескольких доставок одного и того же сообщения.Это неправильный подход .

В системе обмена сообщениями вам приходится иметь дело с сообщениями, которые могут быть доставлены несколько раз , просто потому, что они необходимы для гарантии доставки сообщений в некоторых ситуациях ( см. Здесь для краткостиобъяснение ).

Вы не можете полностью избежать множественных доставок, не жертвуя другими аспектами своей системы.

Если вы вместо этого создадите своих идемпотентов Идемпотент , им все равно, будет ли сообщение доставлено брокером несколько раз.Таким образом, вам не нужно ограничивать своего брокера.

0 голосов
/ 13 декабря 2018

Проблема может быть на стороне производителя.Возможно, вам понадобится проверить, отправляет ли продюсер сообщение.Вы можете использовать логирование операторов для того же.или вы можете использовать семантику ровно один раз для производителя кафки.вам нужно только добавить дополнительное свойство для того же.

Другое может быть то, что ваш потребитель не фиксирует смещения.вам может потребоваться провести мозговой штурм на этом конце тоже

...