Существует еще один альтернативный подход
У Kafka Consumer есть API для получения конечной точки для каждого раздела темы
List partitions = new ArrayList<>();
for (PartitionInfo p : parts) {
partitions.add(new TopicPartition(topic, p.partition()));
}
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
Для каждого раздела темы вы можете получить последнее зафиксированное смещение. Вы можетелегко получить неизрасходованное отставание, используя эти 2 числа
отставание = (конец смещения, последний совершенный)
for (TopicPartition tp : offsets.keySet()) {
OffsetAndMetadata commitOffset = consumer.committed(new TopicPartition(tp.topic(), tp.partition()));
Long lag = commitOffset == null ? offsets.get(tp) : offsets.get(tp) - commitOffset.offset();
}