Как отслеживать отставание потребителей кафки для транзакционных потребителей - PullRequest
0 голосов
/ 11 июня 2019

Существует полезная метрика для мониторинга отставания потребителя Kafka в spring-kafka, называемая kafka_consumer_records_lag_max_records. Но этот показатель не работает для транзакционных потребителей. Существует ли конкретная конфигурация для включения показателя задержки для транзакционных потребителей?

Я настроил свою группу потребителей для работы с уровнем изоляции read_committed, а показатель содержит kafka_consumer_records_lag_max_records{client_id="listener-1",} -Inf

1 Ответ

0 голосов
/ 11 июня 2019

Что вы подразумеваете под "не работает"? Я только что проверил, и он отлично работает ...

@SpringBootApplication
public class So56540759Application {

    public static void main(String[] args) throws IOException {
        ConfigurableApplicationContext context = SpringApplication.run(So56540759Application.class, args);
        System.in.read();
        context.close();
    }

    private MetricName lagNow;

    private MetricName lagMax;

    @Autowired
    private MeterRegistry meters;

    @KafkaListener(id = "so56540759", topics = "so56540759", clientIdPrefix = "so56540759",
            properties = "max.poll.records=1")
    public void listen(String in, Consumer<?, ?> consumer) {
        Map<MetricName, ? extends Metric> metrics = consumer.metrics();
        Metric currentLag = metrics.get(this.lagNow);
        Metric maxLag = metrics.get(this.lagMax);
        System.out.println(in
                + " lag " + currentLag.metricName().name() + ":" + currentLag.metricValue()
                + " max " + maxLag.metricName().name() + ":" + maxLag.metricValue());
        Gauge gauge = meters.get("kafka.consumer.records.lag.max").gauge();
        System.out.println("lag-max in Micrometer: " + gauge.value());
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so56540759", 1, (short) 1);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        Set<String> tags = new HashSet<>();
        FetcherMetricsRegistry registry = new FetcherMetricsRegistry(tags, "consumer");
        MetricNameTemplate temp = registry.recordsLagMax;
        this.lagMax = new MetricName(temp.name(), temp.group(), temp.description(),
                Collections.singletonMap("client-id", "so56540759-0"));
        temp = registry.partitionRecordsLag;
        Map<String, String> tagsMap = new LinkedHashMap<>();
        tagsMap.put("client-id", "so56540759-0");
        tagsMap.put("topic", "so56540759");
        tagsMap.put("partition", "0");
        this.lagNow = new MetricName(temp.name(), temp.group(), temp.description(), tagsMap);

        return args -> IntStream.range(0, 10).forEach(i -> template.send("so56540759", "foo" + i));
    }

}
2019-06-11 12:13:45.803  INFO 32187 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = so56540759-0
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = so56540759
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_committed
    ...
    transaction.timeout.ms = 60000
    ...

2019-06-11 12:13:45.840  INFO 32187 --- [o56540759-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so56540759-0]
foo0 lag records-lag:9.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo1 lag records-lag:8.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo2 lag records-lag:7.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo3 lag records-lag:6.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo4 lag records-lag:5.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo5 lag records-lag:4.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo6 lag records-lag:3.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo7 lag records-lag:2.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo8 lag records-lag:1.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0
foo9 lag records-lag:0.0 max records-lag-max:9.0
lag-max in Micrometer: 9.0

EDIT2

Я делаю вижу, что в MBean будет -Infinity, если транзакция истекает - то есть, если в моем тесте слушатель не завершает свою работу в течение 60 секунд.

enter image description here

...