Flink включает метрики как для пропускной способности (numRecordsInPerSecond и numRecordsOutPerSecond), так и latency .
Если вы хотите более тщательно измерить сквозную задержку, вы можете добавить пользовательский показатель c в приемнике (или другом терминальном узле), который сравнивает временные метки в ваших событиях с текущим временем. Это выглядело бы примерно так:
public class LatencyMeasuringSink<T> extends RichSinkFunction<T> {
private transient DescriptiveStatisticsHistogram eventTimeLag;
private static final int EVENT_TIME_LAG_WINDOW_SIZE = 10_000;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
eventTimeLag = getRuntimeContext().getMetricGroup().histogram("eventTimeLag",
new DescriptiveStatisticsHistogram(EVENT_TIME_LAG_WINDOW_SIZE));
}
@Override
public void invoke(T dataPoint, Context context) throws Exception {
eventTimeLag.update(System.currentTimeMillis() - dataPoint.getTimeStampMs());
}
}
Возможно, вы захотите настроить своего производителя Kafka так, чтобы он включал в ваши события отметки времени LogAppendTime
и использовал их в качестве основы для сравнения. Это предполагает, конечно, что часы на разных задействованных машинах синхронизированы достаточно хорошо, чтобы это измерение имело смысл - или вы можете запустить тесты на одной машине.
FLIP-83: Flink Сквозная платформа тестирования производительности также может представлять интерес. Эта работа продолжается.