Мне нужно прочитать множество записей от начального смещения до конечного смещения. Я использую для этого преданного потребителя Кафку. Я в порядке с хотя бы раз семантической (в случае, если данный экземпляр приложения выходит из строя, и новый экземпляр приложения повторно считывает записи из этого начального смещения).
Итак, я могу использовать такой код?
private static KafkaConsumer<Long, String> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<>(props);
}
public void process() {
KafkaConsumer consumer = createConsumer();
TopicPartition topicPartition = new TopicPartition("topic", 2);
consumer.assign(List.of(topicPartition));
long startOffset = 42;
long endOffset = 100;
consumer.seek(topicPartition, startOffset);
boolean isRunning = true;
while (isRunning) {
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
for (ConsumerRecord<Long, String> record : consumerRecords) {
if (record.offset() >= endOffset) {
isRunning = false;
break;
}
}
}
consumer.close();
}
Итак:
- У меня нет
commit()
- Я отключаю
auto-commit
- У меня нет
group-id
Это правильный код? Или у него есть какие-то скрытые проблемы?