Я настроил Flink для предоставления метрик JMXReporter, как говорится здесь . Я также настроил рабочий узел так, чтобы он также представлял свои метрики главному узлу, как говорится здесь . Ниже приведена конфигурация файла conf/flink-conf.yaml
на главном и рабочем узле. Файлы журнала Flink говорят, что он подключен к серверу JMX. Затем я открываю VisualVM и подключаюсь к своему серверу JMX и перехожу на вкладку MBeans. Там я вижу пакет JobManager. Однако я не нахожу счетчик и счетчик, которые я создал. Я также установил плагин JConsole. Но когда я перехожу на вкладку JConsole, я вижу сообщение: No JConsole plugin installed. To install a JConsole plugin click the Configure Plugins button and provide full path to the plugin file/directory
. И согласно этой ссылке мне нужен файл WtJmxPlugin.jar
, который я не знаю, где найти.
# Metrics Reporter on the MASTER node
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789-8790
metrics.reporter.jmx.interval: 30 SECONDS
# Metrics Reporter on the WORKER node
env.java.opts: -Djava.net.preferIPv4Stack=true -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.56.1
У меня есть RichMapFunction
, который выставляет метрики.
public static class SensorTypeMapper
extends RichMapFunction<MqttSensor, Tuple2<CompositeKeySensorType, MqttSensor>> {
private static final long serialVersionUID = -4080196110995184486L;
private transient Counter counter;
private transient Meter meter;
@Override
public void open(Configuration config) {
this.counter = getRuntimeContext().getMetricGroup().counter("counterSensorTypeMapper");
com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();
this.meter = getRuntimeContext().getMetricGroup().meter("meterSensorTypeMapper",
new DropwizardMeterWrapper(dropwizardMeter));
}
@Override
public Tuple2<CompositeKeySensorType, MqttSensor> map(MqttSensor value) throws Exception {
this.meter.markEvent();
this.counter.inc();
// every sensor key: sensorId, sensorType, platformId, platformType, stationId
// Integer sensorId = value.getKey().f0;
String sensorType = value.getKey().f1;
Integer platformId = value.getKey().f2;
// String platformType = value.getKey().f3;
Integer stationId = value.getKey().f4;
CompositeKeySensorType compositeKey = new CompositeKeySensorType(stationId, platformId, sensorType);
return Tuple2.of(compositeKey, value);
}
}