Я создал класс, который реализует org.apache.kafka.common.metrics.KafkaMetric
примерно так:
public class DatadogMetricTracker implements MetricsReporter {
@Override
public void configure(Map<String, ?> configs) {
System.out.println(configs);
}
@Override
public void init(List<KafkaMetric> metrics) {
System.out.println(metrics);
}
@Override
public void metricChange(KafkaMetric metric) {
System.out.println(metric.metricName().name() + ": " + metric.value() + " tags: " + metric.metricName().tags());
}
@Override
public void metricRemoval(KafkaMetric metric) {
}
@Override
public void close() {
}
}
Затем я регистрирую класс в качестве метрического репортера при настройке реквизита Кафки:
properties.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "com.myco.utils.DatadogMetricTracker");
Когда я запускаю моего потребителя, configure
вызывается и init
, затем metricChange
вызывается один раз с партией метрик, для которых все значения равны 0 или -Infinity, и больше никогда не вызывается. Как мне снова запустить мой метрический рекордер?
Спасибо!