Я пытаюсь добиться отставания потребителя с помощью. NET Confluent.Kafka 1.4.0-RC1 (для Net472). Я могу получить желаемый результат, используя этот скрипт:
$ bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server localhost:9092 --group Grp1 --describe
Результат:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
Grp1 test3 1 15 15 0 rdkafka-ca76855f-7b66-4bf1-82bc-73e9a1c1cf71 /10.186.129.93 rdkafka
Grp1 test3 2 13 13 0 rdkafka-d64379dc-881a-4f6f-a793-51e832cc2f5a /10.186.129.93 rdkafka
Grp1 test3 0 9 9 0 rdkafka-a25bdb80-3b70-4e42-963e-d41ad9e2a99a /10.186.129.93 rdkafka
Grp1 test 0 68 68 0 - - -
Я не могу получить аналогичный отчет, используя клиентский код. NET. Вот код, который я пробовал - но ничего не получить, поскольку свойство consumer.Assignment
имеет пустую коллекцию.
private string WriteConsumerGroupLags(string bootstrapServers, string consumerGroupName) {
// kafka-console-consumer.bat --zookeeper MW45670117:2380 --topic powertelemetry --consumer-property group.id=test123 --consumer-property enable.auto.commit=true
StringBuilder sb = new StringBuilder();
sb.AppendLine("\n");
sb.AppendLine("Consumer Group Lag Report");
sb.AppendLine("-------------------------");
ConsumerConfig config = new ConsumerConfig {
BootstrapServers = bootstrapServers,
GroupId = consumerGroupName,
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()) {
foreach (TopicPartition tp in consumer.Assignment) {
string topic = tp.Topic;
int partitionID = tp.Partition.Value;
// gets the current position (offset) for the specific topic/partition
Offset offset = consumer.Position(new TopicPartition(topic, new Partition(partitionID)));
sb.AppendLine($"Offset value is: {offset.Value}");
// returns current commited offset for the current assignment
List<TopicPartitionOffset> tpos = consumer.Committed(TimeSpan.FromSeconds(4));
foreach (TopicPartitionOffset tpo in tpos) {
sb.AppendLine($"Commited offset for partition {tpo.TopicPartition} is {tpo.Offset}");
}
}
}
return sb.ToString();
}
Поиск как отставания потребителя, так и последнего смещения на раздел / группу потребителей.