Вам не нужны богатые функции, чтобы использовать отслеживание задержки Flink.Вам просто нужно включить его, установив latencyTrackingInterval
в положительное число в конфигурации Flink или ExecutionConfig, например,
env.getConfig().setLatencyTrackingInterval(1000);
, и затем вы сможете наблюдать результаты в своем решении для метрик или через REST.api (показатели задержки не отображаются в веб-интерфейсе Flink).
Документация
Обновление:
Статистика задержки - это показатели работы, ив списке, возвращаемом
http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics
Значения метрики задержки можно получить из
http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics?get=<metric_name>
Эти метрики имеют имена, такие как
latency.source_id.<ID>.operator_id.<ID>.operator_subtask_index.<SUBTASK>.<metric>
, где идентификаторы идентифицируют источники операторские узлы в графе заданий, между которыми измеряется задержка.
Например, я могу определить задержку 95-го процентиля между источником и одним из приемников в задании, которое я сейчас запускаю, с помощью этого запроса:
http://localhost:8081/jobs/94b189a96b98b3aafaba6db6aa8b770b/metrics?get=latency.source_id.bc764cd8ddf7a0cff126f51c16239658.operator_id.fd0ee602f2fa8d310d9bd9f694e185f5.operator_subtask_index.0.latency_p95
В качестве альтернативы,Вы можете использовать ProcessFunction, чтобы добавить временные метки времени обработки к вашим событиям до того, как они войдут в часть CEP вашей работы, а затем использовать другую ProcessFunction для измерения истекшего времени.