Как использовать сообщения от Кафки в Java, начиная с указанного смещения c - PullRequest
1 голос
/ 24 февраля 2020

чтение с самого раннего:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

чтение с самого последнего:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

но какую строку мне следует использовать, если я хочу начать, например: с 18-го коммита вперед?

1 Ответ

1 голос
/ 24 февраля 2020

Вы можете использовать seek(), чтобы заставить потребителя начать потреблять с указанного c смещения :

public void seek(TopicPartition partition, long offset)

Переопределяет смещения выборки, которые потребитель будет использовать в следующем poll(timeout). Если этот API вызывается для одного и того же раздела более одного раза, последнее смещение будет использовано для следующего poll(). Обратите внимание, что вы можете потерять данные, если этот API произвольно используется в середине потребления, чтобы сбросить смещения выборки


Например, предположим, что вы хотите начать со смещения 18:

TopicPartition tp = new TopicPartition("myTopic", 0);
Long startOffset = 18L

List<TopicPartition> topics = Arrays.asList(tp);
consumer.assign(topics);
consumer.seek(topicPartition, startOffset);

// Then consume messages as normal..
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...