Мой коннектор приемника kafka читает из нескольких тем (настроенных на 10 задач) и обрабатывает более 300 записей из всех тем.На основании информации, содержащейся в каждой записи, соединитель может выполнять определенные операции.
Вот пример пары ключ: значение в записи триггера:
"REPROCESS":"my-topic-1"
После прочтения этой записи мне потребуется сбросить смещения темы 'my-topic-1' в каждом из ее разделов на 0.
Во многих местах я читал, что создается новый KafkaConsumer
, подписка на разделы темы, затем вызов метода subscribe(...)
- рекомендуемый способ.Например,
public class MyTask extends SinkTask {
.
.
.
@Override
public void put(Collection<SinkRecord> records) {
records.forEach(record -> {
if (record.key().toString().equals("REPROCESS")) {
reprocessTopicRecords(record);
} else {
// do something else
}
});
}
private void reprocessTopicRecords(SinkRecord record) {
KafkaConsumer<JsonNode, JsonNode> reprocessorConsumer =
new KafkaConsumer<>(reprocessorProps, deserializer, deserializer);
reprocessorConsumer.subscribe(Arrays.asList(record.value().toString()),
new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// do offset reset here
}
}
);
}
}
Однако приведенная выше стратегия не работает для моего случая, потому что: 1. Это зависит от того, происходит ли групповой перебаланс (не всегда) 2. 'разделы, передаваемые методу onPartitionsAssigned
, являются динамически назначаемыми разделами, это означает, что они являются только подмножеством полного набора разделов, для которых потребуется сброс их смещения.Например, этому SinkTask будет назначено только 2 из 8 разделов, которые содержат записи для «my-topic-1».
Я также изучил использование assign()
, но это не совместимо сраспределенная модель потребителей (группы потребителей) в реализации SinkConnector / SinkTask.
Я знаю, что инструмент командной строки kafka kafka-consumer-groups
может делать именно то, что я хочу (я думаю): https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b
Подводя итог, я хочу сбросить смещения всех разделов для данной темы, используя Java API, и позволить Sink Connector регистрировать изменения смещения и продолжать делать то, что делал (обработка записей).
Заранее спасибо.