Можно ли сбросить смещения в тему для группы потребителей кафки в коннекторе кафки? - PullRequest
1 голос
/ 01 мая 2019

Мой коннектор приемника kafka читает из нескольких тем (настроенных на 10 задач) и обрабатывает более 300 записей из всех тем.На основании информации, содержащейся в каждой записи, соединитель может выполнять определенные операции.

Вот пример пары ключ: значение в записи триггера:

"REPROCESS":"my-topic-1"

После прочтения этой записи мне потребуется сбросить смещения темы 'my-topic-1' в каждом из ее разделов на 0.

Во многих местах я читал, что создается новый KafkaConsumer, подписка на разделы темы, затем вызов метода subscribe(...) - рекомендуемый способ.Например,

public class MyTask extends SinkTask { . . . @Override public void put(Collection<SinkRecord> records) { records.forEach(record -> { if (record.key().toString().equals("REPROCESS")) { reprocessTopicRecords(record); } else { // do something else } }); } private void reprocessTopicRecords(SinkRecord record) { KafkaConsumer<JsonNode, JsonNode> reprocessorConsumer = new KafkaConsumer<>(reprocessorProps, deserializer, deserializer); reprocessorConsumer.subscribe(Arrays.asList(record.value().toString()), new ConsumerRebalanceListener() { public void onPartitionsRevoked(Collection<TopicPartition> partitions) {} public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // do offset reset here } } ); } }

Однако приведенная выше стратегия не работает для моего случая, потому что: 1. Это зависит от того, происходит ли групповой перебаланс (не всегда) 2. 'разделы, передаваемые методу onPartitionsAssigned, являются динамически назначаемыми разделами, это означает, что они являются только подмножеством полного набора разделов, для которых потребуется сброс их смещения.Например, этому SinkTask будет назначено только 2 из 8 разделов, которые содержат записи для «my-topic-1».

Я также изучил использование assign(), но это не совместимо сраспределенная модель потребителей (группы потребителей) в реализации SinkConnector / SinkTask.

Я знаю, что инструмент командной строки kafka kafka-consumer-groups может делать именно то, что я хочу (я думаю): https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b

Подводя итог, я хочу сбросить смещения всех разделов для данной темы, используя Java API, и позволить Sink Connector регистрировать изменения смещения и продолжать делать то, что делал (обработка записей).

Заранее спасибо.

Ответы [ 2 ]

0 голосов
/ 03 мая 2019

Мне удалось добиться сброса смещений для группы потребителей kafka connect с помощью ряда API-интерфейсов kafka-rest-proxy от Confluent: https://docs.confluent.io/current/kafka-rest/api.html

Для этой реализации больше не требуется подход «триггерная запись».описано в исходном посте и основано исключительно на Rest API.

  1. Временно удалите соединитель kafka (это удалит потребителей и соединителя)

  2. Создатьэкземпляр потребителя для той же группы потребителей ("connect -")

  3. Получить экземпляр подписки на запрошенную тему, которую требуется сбросить

  4. Проведите фиктивный опрос («подписка» оценивается лениво »)

  5. Сброс смещений тем в группах потребителей для указанной темы

  6. Сделатьфиктивный опрос ('поиск' оценивается лениво ') Фиксация текущего состояния смещения (в прокси) для потребителя

  7. Повторное создание соединителя kafka (с тем же именем соединителя)- после восстановления баланса потребителиприсоединитесь к группе и прочитайте последнее зафиксированное смещение (начиная с 0)

  8. Удалите временный экземпляр потребителя

Если вы можете использоватьCLI, шаги 2-6 можно заменить на:

kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute

Что касается тех, кто пытается сделать это в коде коннектора kafka через нативные API-интерфейсы Java, у вас нетудача: - (

0 голосов
/ 01 мая 2019

Вы ищете метод поиска.Либо со смещением

consumer.seek(new TopicPartition("topic-name", partition), offset);

Или seekToBeginning

Однако я чувствую, что вы будете конкурировать с группой потребителей API Connect Sink.Другими словами, если вы настроите потребителя с отдельным идентификатором группы, то вы по существу потребляете записи здесь дважды из исходного раздела, один раз по Connect, а затем по своему собственному экземпляру потребителя.

Если вы явно не ищетеТакже подключите собственный потребительский экземпляр (который не выставлен), вы попадете в странное состояние.Например, ваша задача выполняется только для новых записей в теме, несмотря на тот факт, что ваш собственный потребитель будет смотреть на старое смещение, или вы все равно будете получать еще более новые события, все еще обрабатывая старые

Такжев конечном итоге вы можете получить событие повторной обработки в самом начале темы из-за политик хранения, например, истечения срока действия старых записей, из-за чего ваш потребитель вообще не прогрессирует и постоянно перебалансирует свою группу, пытаясь найти начало

...