Доступна ли какая-либо конфигурация для очистки незафиксированного сообщения во время перезапуска kafka или потребителя? - PullRequest
0 голосов
/ 24 апреля 2020

У меня есть бизнес-сценарий, когда потребители не должны принимать зафиксированные / незафиксированные сообщения от topi c при перезапуске потребителя или kafka. Я попытался применить auto.offset.reset: latest. Но он вытягивает незафиксированные смещения из topi c. Например, для приложения с одним экземпляром с 1 topi c и 1 разделом. Предположим, я отправил 10 сообщений, потребитель выбрал 5 сообщений и зафиксировал смещение. Теперь я перезапускаю любой экземпляр потребителя /kafka. После перезапуска он не должен выбирать старые 5 сообщений, которые не были зафиксированы. Поиск любой другой конфигурации или обходных путей.

Ответы [ 2 ]

0 голосов
/ 24 апреля 2020

Используйте уникальный group.id (например, UUID) при каждом запуске или seekToEnd каждый назначенный раздел во время запуска.

См. В поисках спецификации c Смещение .

0 голосов
/ 24 апреля 2020

Необходимо убедиться, что ваш потребитель получает новую группу потребителей (в Java API конфигурация потребителя для этого называется group.id) при каждом перезапуске приложения. Даже если вы перезапустите своего брокера, вы все равно перезапустите приложение с новым group.id. И сохраните конфигурацию auto.offset.reset=latest.

Другой вариант - вручную изменить смещения группы потребителей после каждого перезапуска брокера. Kafka поставляется с инструментом ConsumerGroupCommand. Вы можете найти некоторую информацию в документации Kafka Управление группами потребителей .

Если вы планируете сбросить определенную группу потребителей («myConsumerGroup»), вы можете использовать

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group myConsumerGroup --topic topic1 --to-latest

В зависимости от ваших требований вы можете сбросить смещения для каждого раздела topi c с помощью этого инструмента. Справочная функция или документация объясняют параметры.

...