Текущая версия 2.2.x: 2.2.10.
Если вы используете Spring Boot:
@Bean
public ApplicationRunner runner(KafkaAdmin admin, ConsumerFactory<String, String> cf) {
return args -> {
Consumer<String, String> consumer = cf.createConsumer("group", "clientId", "");
try (AdminClient client = AdminClient.create(admin.getConfig())) {
Collection<ConsumerGroupListing> groups = client.listConsumerGroups()
.all()
.get(10, TimeUnit.SECONDS);
groups.forEach(group -> {
Map<TopicPartition, OffsetAndMetadata> map = null;
try {
map = client.listConsumerGroupOffsets(group.groupId())
.partitionsToOffsetAndMetadata()
.get(10, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
e.printStackTrace();
}
catch (ExecutionException e) {
e.printStackTrace();
}
catch (TimeoutException e) {
e.printStackTrace();
}
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(map.keySet());
map.forEach((tp, off) -> {
System.out.println("group: " + group + " tp: " + tp
+ " current offset: " + off.offset()
+ " end offset: " + endOffsets.get(tp));
});
});
}
finally {
consumer.close();
}
};
}
Если вы не используете Spring Boot, используйте AdminClient.create()
для созданияадминистратор, создать потребителя и сделать то же самое.