Apache Storm с управлением смещением Kafka - PullRequest
0 голосов
/ 26 марта 2019

Я создал пример топологии со Storm, используя Кафку в качестве источника. Вот проблема, решение которой мне нужно.

Каждый раз, когда я убиваю топологию и запускаю ее снова, топология начинает обрабатываться с самого начала.

Предположим, сообщение A в теме X обработано топологией, а затем я уничтожаю топологию.

Теперь, когда я снова отправляю топологию, а Сообщение А все еще остается, Тема X. Она снова обрабатывается.

Есть ли решение, может быть, какое-то управление смещением для решения этой ситуации.

Ответы [ 2 ]

1 голос
/ 26 марта 2019

Не следует использовать storm-kafka для нового кода, он устарел, поскольку базовый клиентский API устарел в Kafka и удален с 2.0.0.Вместо этого используйте storm-kafka-client.

. С помощью storm-kafka-client вы хотите установить идентификатор группы и стратегию смещения первого опроса.

KafkaSpoutConfig.builder(bootstrapServers, "your-topic")
            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
            .setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
            .build();

Приведенное выше описание заставит ваш носик начаться ссамое раннее смещение при первом запуске, а затем он переместится с того места, на котором остановился, если вы перезапустите его.Идентификатор группы используется Kafka для распознавания носика при его перезапуске, чтобы он мог вернуть сохраненную контрольную точку смещения.Другие стратегии смещения будут вести себя по-разному, вы можете проверить javadoc для перечисления FirstPollOffsetStrategy.

Носик будет проверять, как далеко он проходил периодически, также в настройке есть настройка для управления этим.Контрольная точка контролируется настройкой setProcessingGuarantee в конфигурации и может быть настроена на выбор как минимум один раз (только смещенные контрольные точки), максимум один раз (контрольная точка перед тем, как носик издает сообщение), и «в любое время»"(периодически проверяйте, игнорируя подтверждения).

Взгляните на одну из примеров топологий, включенных в Storm https://github.com/apache/storm/blob/dc56e32f3dcdd9396a827a85029d60ed97474786/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTopologyMainNamedTopics.java#L93.

1 голос
/ 26 марта 2019

При создании spoutconfig убедитесь, что он имеет фиксированный идентификатор spout, по которому он может идентифицировать себя после перезапуска.

С официального сайта Storm:

Внимание: при перезапуске-развернув топологию, убедитесь, что параметры SpoutConfig.zkRoot и SpoutConfig.id не были изменены, иначе spout не сможет прочитать информацию о своем предыдущем состоянии потребителя (то есть смещения) из ZooKeeper - что может привести к неожиданному поведениюи / или к потере данных, в зависимости от вашего варианта использования.

...