Как выполнить задачу агрегации с помощью Flink CEP - PullRequest
0 голосов
/ 12 октября 2019

Мне нужно посчитать, сколько раз в день происходит A и за 15 минут - B。 Поток может быть A1, A2, B1, B2, A3, B3, B4, B5, A4, A5, A6,A7, B6。 В моем случае результаты события: A2, B1 A3, B3 A7, B6。 И мне нужно получать результат в реальном времени, когда происходит совпадение. Tired Я что-то устал? Но flink-sql-cep не поддерживает агрегацию. Это только вычисление события, произошедшего this В этом случае, как выполнить эту задачу с одним SQL.

Я устал два шага, чтобы сделать это. Я использую flink sql cep, чтобы сначала сопоставить ,, а затем опуститься до kafka. На первом шаге я использую pre kafka и использую над окном для агрегации.

первый шаг : выберите выводы как pin, 'first-step' как result_id, cast (order_amount as varchar) как result_value, event_time как result_time из stra_dtpipelineMATCH_RECOGNIZE (PARTITION BY pin
ORDER BY event_time MEASURES
t1.pin в качестве пинов, '1' в качестве order_amount, LOCALTIMESTAMP в качестве event_time ОДНА СТРОКА В МАТЧЕ ПОСЛЕ МАТЧА Пропустить следующую строку PATTERN (t1 t2) ВНУТРИ ИНТЕРВАЛА '30'ВТОРОЙ
DEFINE
t1 как t1.act_type = '100001', t2 как t2.act_type = '100002') второй шаг: выберите вывод, 'job5' как result_id, приведите (sum (1) over (PARTITION BY)pin, приведение (DATE_FORMAT (event_time, '% Y% m% d') в качестве VARCHAR) порядка по Event_time ROWS МЕЖДУ ИНТЕРВАЛОМ '1' DAY PRECEDING и CURRENT ROW) в качестве VARCHAR) в качестве result_value, CURRENT_TIMESTAMP в качестве result_time из stra_didpidfirst-step 'и DAYOFMONTH (CURRENT_DATE) = DAYOFMONTH (event_time)

Я ожидаю выполнить эту задачу с помощью одного SQL.

1 Ответ

0 голосов
/ 14 октября 2019

Вы можете объединить два запроса в один запрос, используя подзапрос или представление.

Это будет что-то вроде

SELECT a, b OVER (...) ORDER BY event_time FROM (SELECT x, y MATCH_RECOGNIZE ...) WHERE ...

или

CREATE VIEW pattern AS SELECT x, y MATCH_RECOGNIZE ...
SELECT ... FROM pattern WHERE ...
...