Не следует использовать storm-kafka
для нового кода, он устарел, поскольку базовый клиентский API устарел в Kafka и удален с 2.0.0.Вместо этого используйте storm-kafka-client
.
. С помощью storm-kafka-client
вы хотите установить идентификатор группы и стратегию смещения первого опроса.
KafkaSpoutConfig.builder(bootstrapServers, "your-topic")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
.build();
Приведенное выше описание заставит ваш носик начаться ссамое раннее смещение при первом запуске, а затем он переместится с того места, на котором остановился, если вы перезапустите его.Идентификатор группы используется Kafka для распознавания носика при его перезапуске, чтобы он мог вернуть сохраненную контрольную точку смещения.Другие стратегии смещения будут вести себя по-разному, вы можете проверить javadoc для перечисления FirstPollOffsetStrategy.
Носик будет проверять, как далеко он проходил периодически, также в настройке есть настройка для управления этим.Контрольная точка контролируется настройкой setProcessingGuarantee
в конфигурации и может быть настроена на выбор как минимум один раз (только смещенные контрольные точки), максимум один раз (контрольная точка перед тем, как носик издает сообщение), и «в любое время»"(периодически проверяйте, игнорируя подтверждения).
Взгляните на одну из примеров топологий, включенных в Storm https://github.com/apache/storm/blob/dc56e32f3dcdd9396a827a85029d60ed97474786/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L93.