@ Nimo1981 Так что это реализация с простой Java.Я не уверен, отвечает ли это вашим потребностям.Таким образом, в основном я передаю смещение 0 (то есть, даже если я читаю из темы Кафки, я возвращаюсь к смещению, которое находится в начале). Я не уверен, рассматривали ли вы эту реализацию, но, пожалуйста, дайте мне знать, если этоэто то, что вы ищете
Не указывайте CommitCountObj.Это не нужно для вашего.Поэтому по умолчанию offsetMap будет иметь следующую запись смещения, подобную этой,
offsetMap.put (new TopicPartition (record.topic (), record.partition ()), новый OffsetAndMetadata (record.offset () + 1, «какое-то сообщение об успешной фиксации»));
, но для вашего случая использования я немного изменен, он работает хорошо, когда потребитель не перезапускается
offsetMap.put (new TopicPartition (record.topic (), record.partition ()), новый OffsetAndMetadata (0, «фиксация не выполнена»));
public class KafkaConsumerClass {
private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaConsumerClass.class);
private CommitCountClass commitCountobj = new CommitCountClass();
public Consumer<String, List<FeedBackConsumerClass>> createConsumer() {
Map<String, Object> consumerProps = new HashMap<String, Object>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:7070,localhost:7072");
consumerProps.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 50000);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "first_group-client1");
// consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "first_group");
// consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaConsumerInterceptor.class);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1500);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<String, List<FeedBackConsumerClass>>(consumerProps);
}
public void consumeRecord() {
log.info("Coming inside consumer consumer");
ArrayList<String> topicList = new ArrayList<String>();
topicList.add("topic1");
commitCountobj.setCount(0);
Consumer<String, List<FeedBackConsumerClass>> kafkaConsumer = createConsumer();
kafkaConsumer.subscribe(topicList);
log.info("after subscribing");
Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
while (true) {
ConsumerRecords<String, List<FeedBackConsumerClass>> recordList = kafkaConsumer.poll(Long.MAX_VALUE);
// kafkaConsumer.seekToBeginning(kafkaConsumer.assignment());
log.info("Inside while loop:" + recordList);
if (!recordList.isEmpty()) {
recordList.forEach(record -> {
int i = 0;
System.out.println(record.toString());
// we can make the call to the API here
// call the db here or any API and process the record
// then call the code to commit
// since the commit is switched off, it becomes a developers responsibility to do the auto commit
offsetMap.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(0, "no metadata/offset commited"));
// here we are incrementing the offsetMap so that we are making sure we are storing the
// next set of offsets in the map
if (commitCountobj.getCount() % 1000 == 0) {
kafkaConsumer.commitAsync(offsetMap, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
// TODO Auto-generated method stub
if (exception != null) {
// retry it now with a sync
// possibility of error occuring here as well
// so capture the exception and exit the consumer gracefully
kafkaConsumer.commitSync();
log.error(exception.getMessage());
}
}
});
}
commitCountobj.setCount(i++);
});
}
}
}
}