Чтобы прочитать сообщения от начального смещения до конечного смещения, сначала необходимо использовать seek()
, чтобы переместить потребителя в нужное начальное местоположение, а затем poll()
, пока вы не достигнете желаемого конечного смещения.
Например, потреблять со смещения от 100 до 200:
String topic = "test";
TopicPartition tp = new TopicPartition(topic, 0);
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Move to the desired start offset
consumer.seek(tp, 100L);
}
});
boolean run = true;
long lastOffset = 200L;
while (run) {
ConsumerRecords<String, String> crs = consumer.poll(Duration.ofMillis(100L));
for (ConsumerRecord<String, String> record : crs) {
System.out.println(record);
if (record.offset() == lastOffset) {
// Reached the end offsey, stop consuming
run = false;
break;
}
}
}
}