Я новичок во Flink, и я пытаюсь использовать его, чтобы получить несколько живых просмотров моего приложения.По крайней мере одно из динамических представлений, которые я хотел бы построить, было бы для отображения записей, которые не соответствовали SLA - или по существу истекли - и условием для этого было бы простое сравнение меток времени.Поэтому я бы хотел, чтобы запись в моей динамической таблице отображалась, если к ней недавно присоединилось событие NOT .Играя с Flink 1.6 (ограниченным этим из-за AWS Kinesis) в среде разработчиков, я не вижу, чтобы Flink переоценивал условие, если событие не касается этой записи.
У меня естьМоя среда разработки подключена к потоку Kinesis, который отправляет события журнала прямого доступа с веб-сервера.Это не мой реальный вариант использования, но его было легко начать тестировать.Я написал простой табличный запрос, который извлекает путь запроса, его время последнего доступа и вычисляет логический флаг, чтобы указать, не был ли к нему обращен в последнюю минуту.Я отлаживаю это с помощью потока отвода, подключенного к PrintSinkFunction, поэтому все обновления / удаления печатаются на моей консоли.
tEnv.registerDataStream("AccessLogs", accessLogs, "username, status, request, responseSize, referrer, userAgent, requestTime, ActionTime.rowtime");
Table paths = tEnv.sqlQuery("SELECT request AS path, MAX(requestTime) as lastTime, CASE WHEN MAX(requestTime) < CURRENT_TIMESTAMP - INTERVAL '1' MINUTE THEN 1 ELSE 0 END AS expired FROM AccessLogs GROUP BY request");
DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(paths, Row.class);
retractStream .addSink(new PrintSinkFunction<>());
Я ожидаю, что при доступе к странице в этот поток отправляется событие Add.Затем, если я буду ждать 1 минуту (ничего не делать), оператор CASE в моей таблице будет иметь значение 1, поэтому я должен увидеть событие Delete, а затем Add с установленным флагом.
На самом деле я вижу, что ничегопроисходит, пока я не загружу эту страницу снова.На самом деле событие Delete имеет установленный флаг, в то время как следующее за ним событие Add снова очищается (как и должно быть, поскольку оно больше не "истекло).
// add/delete, path, lastAccess, expired
(true,/mypage,2019-05-20 20:02:48.0,0) // first page load, add event
(false,/mypage,2019-05-20 20:02:48.0,1) // second load > 2 mins later, remove event for the entry with expired flag set
(true,/mypage,2019-05-20 20:05:01.0,0) // second load, add event
Редактировать: самый полезный совет, который я 'В моем поиске мы натолкнулись на создание ProcessFunction . Я думаю, что это то, что я мог бы заставить работать с моими динамическими таблицами (в некоторых случаях я получал бы промежуточные потоки для просмотра вычисленных дат),но, надеюсь, к этому не нужно приходить.
Я получил подход ProcessFunction для работы, но он потребовал гораздо больше усилий, чем я изначально думал:
- Мне нужно было добавить поле в мой POJO, которое меняет метод onTimer () (это может быть дата или версия, которую вы просто ударяете каждый раз)
- Мне пришлось зарегистрировать это поле как часть динамической таблицы.
- Мне пришлось использовать это поле в своем запросе, чтобы запрос был переоценен и изменился логический флаг (даже если я на самом деле не использую новое поле).просто добавил его как часть моего предложения SELECT.