Мы используем Flink 1.8.0 и запускаем его на EMR - Yarn и хотим измерить пропускную способность.
- Поскольку наши операторы объединены в цепочку, мы добавили счетчики и счетчики в наш код - по сути, это асинхронный оператор c, который выполняет вызовы API с кинезисом как в качестве источника, так и в качестве синтаксиса c. В Application Master, то есть в веб-интерфейсе Flink, мы можем получить значение для счетчиков, но не для счетчиков.
public class AsyncClass extends RichAsyncFunction<String, String> {
private transient Counter counter;
private transient Meter meter;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter()));
}
@Override
public void close() throws Exception {
super.close();
ExecutorUtils.gracefulShutdown(20000, TimeUnit.MILLISECONDS, executorService);
}
@Override
public void asyncInvoke(String key, final ResultFuture<String> resultFuture) throws Exception {
resultFuture.complete(key);
this.meter.markEvent();
this.counter.inc();
}
}
Чтобы измерить полную пропускную способность приложения, нам, очевидно, нужна пропускная способность всех менеджеров задач вместе. Используя счетчики, мы можем получить показатели для отдельных менеджеров задач. Есть ли способ измерить его на уровне оператора?