Перезапустите kafka, подключите приемник и разъемы источника, чтобы прочитать с начала - PullRequest
0 голосов
/ 27 марта 2019

Я достаточно много искал по этому вопросу, но, похоже, нет хорошего руководства по этому вопросу.

Из того, что я искал, есть несколько вещей, которые следует учитывать:

  • Сброс внутренних тем соединителя раковины (состояние, конфигурация и смещение).
  • Реализация смещений соединителя источника зависит от конкретной реализации.

Вопрос. Есть ли необходимость в сбросе этих тем?

  • Удаление группы потребителей.
  • Перезапуск соединителя с другим именем (этоэто тоже вариант), но это, кажется, не совсем правильно.
  • Сброс группы потребителей с --reset-offsets до --to-earliest
  • Использование REST API (обеспечивает функцию сброса и чтения с начала)

Как лучше всего перезапустить приемник и соединитель источника для чтения с начала?

Ответы [ 2 ]

2 голосов
/ 27 марта 2019

Исходный соединитель:

  • Автономный режим: удалить файл смещения (/tmp/connect.offsets) или изменить имя соединителя.
  • Распределенный режим: изменить имя соединителя.

Sink Connector (оба режима) одним из следующих способов:

  • Изменение имени.
  • Сброс смещения для группы потребителей.Имя группы совпадает с именем соединителя.

Чтобы сбросить смещение, вы должны сначала удалить соединитель, сбросить смещение (./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --group connectorName --reset-offsets --to-earliest --execute --topic topicName), еще раз добавить ту же конфигурацию

Вы можете проверить следующий вопрос: СброситьJDBC Kafka Connector, чтобы начать тянуть строки с начала времени?

0 голосов
/ 28 июля 2019

Исходный соединитель Распределенный режим - есть еще одна опция, которая создает новое сообщение для темы смещения. Например я использую разъем источника JDBC: При просмотре темы смещения я вижу следующее:

./kafka-console-consumer.sh --zookeeper localhost:2181/kafka11-staging --topic kc-staging--offsets --from-beginning --property print.key=true

["referrer-family-jdbc-source",{"query":"query"}]   {"incrementing":100}

Теперь, чтобы сбросить это, я просто создаю другое сообщение с приращением: 0

Например: как изготовить из оболочки с ключом отсюда

./kafka-console-producer.sh \
  --broker-list `hostname`:9092 \
  --topic kc-staging--offsets \
  --property "parse.key=true" \
  --property "key.separator=|"
["referrer-family-jdbc-source",{"query":"query"}]|{"incrementing":0}

Обратите внимание, что вам необходимо сделать следующее:

  • Удалить соединитель.
  • Создает сообщение с соответствующим смещением, как я описал выше.
  • Создайте соединитель снова.
...