Я пытаюсь создать свою собственную переменную метрики в соответствии с этим руководством
С помощью предоставленного примера кода я могу получить события и гистограмму.
Я запутался, как этот идентификатор использовался prometheus & grafana. Я также пытаюсь немного изменить пример кода, но метрика просто больше не работает.
Кроме того, я могу получить доступ только к метрике системы, но не к своей собственной.
Мой вопрос:
- как я могу получить доступ к счетчику, который я создал? например counter1
- Что именно представляет собой metricGroup?
- Например, я хотел бы обнаружить шаблон из входного потока, и более разумно сделать это в метрике или просто вывести результат в базу данных временных рядов, такую как influenxdb?
спасибо заранее.
Вот функция карты
class FlinkMetricsExposingMapFunction extends RichMapFunction<SensorReading, SensorReading> {
private static final long serialVersionUID = 1L;
private transient Counter eventCounter;
private transient Counter customCounter1;
private transient Counter customCounter2;
@Override
public void open(Configuration parameters) {
eventCounter = getRuntimeContext()
.getMetricGroup().counter("events");
customCounter1 = getRuntimeContext()
.getMetricGroup()
.addGroup("customCounterKey", "mod2")
.counter("counter1");
customCounter2 = getRuntimeContext()
.getMetricGroup().addGroup("customCounterKey", "mod5").counter("counter2");
// meter = getRuntimeContext().getMetricGroup().meter("eventMeter", new DropwizardMeterWrapper(dropwizardMeter));
}
@Override
public SensorReading map(SensorReading value) {
eventCounter.inc();
if (value.getCurrTimestamp() % 2 == 0)
customCounter1.inc();
if (value.getCurrTimestamp() % 5 == 0)
customCounter2.inc();
if (value.getCurrTimestamp() % 2 == 0 && value.getCurrTimestamp() % 5 == 0)
customCounter1.dec();
return value;
}
}
Пример задания:
env
.addSource(new SimpleSensorReadingGenerator())
.name(SimpleSensorReadingGenerator.class.getSimpleName())
.map(new FlinkMetricsExposingMapFunction())
.name(FlinkMetricsExposingMapFunction.class.getSimpleName())
.print()
.name(DataStreamSink.class.getSimpleName());
Обновление
Снимок экранадля доступа к метрикам флинка из графана:
flink-config.yaml
FROM flink:1.9.0
RUN echo "metrics.reporters: prom" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
echo "metrics.latency.interval: 1000" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
echo "metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter" >> "$FLINK_HOME/conf/flink-conf.yaml"; \
mv $FLINK_HOME/opt/flink-metrics-prometheus-*.jar $FLINK_HOME/lib
COPY --from=builder /home/gradle/build/libs/*.jar $FLINK_HOME/lib/
функция карты по умолчанию из учебника:
@Override
public void open(Configuration parameters) {
eventCounter = getRuntimeContext().getMetricGroup().counter("events");
valueHistogram =
getRuntimeContext()
.getMetricGroup()
.histogram("value_histogram", new DescriptiveStatisticsHistogram(10_000_000));
}