Измерение времени ожидания события с помощью Flink CEP - PullRequest
0 голосов
/ 21 января 2019

Я реализовал шаблон с Flink CEP, который соответствует трем событиям, таким как A->B->C. После того как я определил свой шаблон, я генерирую

PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);

с PatternSelectFunction таким, что

patternStream.select(new MyPatternSelectFunction()).print();

Это работает как очарование, но меня интересует время события всех совпадающих событий. Я знаю, что традиционный потоковый API Flink предлагает богатые функции, которые позволяют регистрировать внутренний трекер задержки Flink, как описано в этом вопросе . Я также видел, что для Flink 1.8 добавлен новый RichPatternSelectFunction. Но, к сожалению, я не могу настроить Flink 1.8 с Flink CEP.

Наконец, есть ли способ получить время события для всех совпадающих событий?

1 Ответ

0 голосов
/ 21 января 2019

Вам не нужны богатые функции, чтобы использовать отслеживание задержки Flink.Вам просто нужно включить его, установив latencyTrackingInterval в положительное число в конфигурации Flink или ExecutionConfig, например,

env.getConfig().setLatencyTrackingInterval(1000);

, и затем вы сможете наблюдать результаты в своем решении для метрик или через REST.api (показатели задержки не отображаются в веб-интерфейсе Flink).

Документация

Обновление:

Статистика задержки - это показатели работы, ив списке, возвращаемом

http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics

Значения метрики задержки можно получить из

http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics?get=<metric_name>

Эти метрики имеют имена, такие как

latency.source_id.<ID>.operator_id.<ID>.operator_subtask_index.<SUBTASK>.<metric>

, где идентификаторы идентифицируют источники операторские узлы в графе заданий, между которыми измеряется задержка.

Например, я могу определить задержку 95-го процентиля между источником и одним из приемников в задании, которое я сейчас запускаю, с помощью этого запроса:

http://localhost:8081/jobs/94b189a96b98b3aafaba6db6aa8b770b/metrics?get=latency.source_id.bc764cd8ddf7a0cff126f51c16239658.operator_id.fd0ee602f2fa8d310d9bd9f694e185f5.operator_subtask_index.0.latency_p95

В качестве альтернативы,Вы можете использовать ProcessFunction, чтобы добавить временные метки времени обработки к вашим событиям до того, как они войдут в часть CEP вашей работы, а затем использовать другую ProcessFunction для измерения истекшего времени.

...