Есть ли способ определить динамическую таблицу, состоящую из записей, которые не были затронуты событием в последнее время? - PullRequest
0 голосов
/ 20 мая 2019

Я новичок во Flink, и я пытаюсь использовать его, чтобы получить несколько живых просмотров моего приложения.По крайней мере одно из динамических представлений, которые я хотел бы построить, было бы для отображения записей, которые не соответствовали SLA - или по существу истекли - и условием для этого было бы простое сравнение меток времени.Поэтому я бы хотел, чтобы запись в моей динамической таблице отображалась, если к ней недавно присоединилось событие NOT .Играя с Flink 1.6 (ограниченным этим из-за AWS Kinesis) в среде разработчиков, я не вижу, чтобы Flink переоценивал условие, если событие не касается этой записи.

У меня естьМоя среда разработки подключена к потоку Kinesis, который отправляет события журнала прямого доступа с веб-сервера.Это не мой реальный вариант использования, но его было легко начать тестировать.Я написал простой табличный запрос, который извлекает путь запроса, его время последнего доступа и вычисляет логический флаг, чтобы указать, не был ли к нему обращен в последнюю минуту.Я отлаживаю это с помощью потока отвода, подключенного к PrintSinkFunction, поэтому все обновления / удаления печатаются на моей консоли.

tEnv.registerDataStream("AccessLogs", accessLogs, "username, status, request, responseSize, referrer, userAgent, requestTime, ActionTime.rowtime");

Table paths = tEnv.sqlQuery("SELECT request AS path, MAX(requestTime) as lastTime, CASE WHEN MAX(requestTime) < CURRENT_TIMESTAMP - INTERVAL '1' MINUTE THEN 1 ELSE 0 END AS expired FROM AccessLogs GROUP BY request");

DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(paths, Row.class);
retractStream .addSink(new PrintSinkFunction<>());

Я ожидаю, что при доступе к странице в этот поток отправляется событие Add.Затем, если я буду ждать 1 минуту (ничего не делать), оператор CASE в моей таблице будет иметь значение 1, поэтому я должен увидеть событие Delete, а затем Add с установленным флагом.

На самом деле я вижу, что ничегопроисходит, пока я не загружу эту страницу снова.На самом деле событие Delete имеет установленный флаг, в то время как следующее за ним событие Add снова очищается (как и должно быть, поскольку оно больше не "истекло).

// add/delete, path, lastAccess, expired
(true,/mypage,2019-05-20 20:02:48.0,0) // first page load, add event
(false,/mypage,2019-05-20 20:02:48.0,1) // second load > 2 mins later, remove event for the entry with expired flag set
(true,/mypage,2019-05-20 20:05:01.0,0) // second load, add event

Редактировать: самый полезный совет, который я 'В моем поиске мы натолкнулись на создание ProcessFunction . Я думаю, что это то, что я мог бы заставить работать с моими динамическими таблицами (в некоторых случаях я получал бы промежуточные потоки для просмотра вычисленных дат),но, надеюсь, к этому не нужно приходить.

Я получил подход ProcessFunction для работы, но он потребовал гораздо больше усилий, чем я изначально думал:

  1. Мне нужно было добавить поле в мой POJO, которое меняет метод onTimer () (это может быть дата или версия, которую вы просто ударяете каждый раз)
  2. Мне пришлось зарегистрировать это поле как часть динамической таблицы.
  3. Мне пришлось использовать это поле в своем запросе, чтобы запрос был переоценен и изменился логический флаг (даже если я на самом деле не использую новое поле).просто добавил его как часть моего предложения SELECT.

1 Ответ

0 голосов
/ 27 мая 2019

Ваш подход выглядит многообещающе, но сравнение с меткой времени "сейчас" не поддерживается API таблиц Flink / SQL (пока).

Я бы решил эту проблему в два этапа.

  1. зарегистрируйте динамическую таблицу в режиме upsert, т. Е. Таблицу, которая передается по ключу (request в вашем случае) на основе метки времени версии (requestTime в вашем случае).Результирующая динамическая таблица будет содержать самую последнюю строку для каждого запроса.
  2. Иметь запрос с простым предикатом фильтра, подобным вашему, который сравнивает временную метку версии строк динамической таблицы (upsert) и отфильтровывает все строки, которыеимеют временные метки, которые слишком близки к текущему моменту.

К сожалению, ни одна из этих функций (преобразование в обратном направлении и сравнение с движущейся временной меткой "сейчас") пока недоступна в Flink.Тем не менее, в настоящее время ведутся некоторые работы по преобразованию таблиц upsert.

...