Flink Table / SQL API: изменить атрибут rowtime после агрегации окна сеанса - PullRequest
0 голосов
/ 12 октября 2018

Я хочу использовать Session агрегирование окон, а затем запустить Tumble агрегацию окон поверх полученного результата в Table API/Flink SQL.

Можно ли изменить атрибут rowtime после первого session агрегирование, чтобы оно равнялось .rowtime последнего наблюдаемого события в сеансе?

Я пытаюсь сделать что-то вроде этого:

table
  .window(Session withGap 2.minutes on 'rowtime as 'w)
  .groupBy('w, 'userId)
  .select(
    'userId,
    ('w.end.cast(Types.LONG) - 'w.start.cast(Types.LONG)) as 'sessionDuration,
    ('w.rowtime - 2.minutes) as 'rowtime
  )
  .window(Tumble over 5.minutes on 'rowtime as 'w)
  .groupBy('w)
  .select(
    'w.start,
    'w.end,
    'sessionDuration.avg as 'avgSession,
    'sessionDuration.count as 'numberOfSession
  )

Ключевая часть:

('w.rowtime - 2.minutes) as 'rowtime

Поэтому я хочу переназначить записи .rowtime самого последнего события в сеансе, без перерыва в сеансе (2.minutes в этом примере).

Этоотлично работает в BatchTable, однако не работает в StreamTable:

Exception in thread "main" org.apache.flink.table.api.ValidationException: TumblingGroupWindow('w, 'rowtime, 300000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.

Да, я знаю, мне кажется, что я не хочу изобретать машину времени и менять порядок времени.Но реально ли как-то достичь описанного поведения?

1 Ответ

0 голосов
/ 12 октября 2018

Нет, к сожалению, вы не можете сделать это с помощью SQL или Table API в текущей версии (1.6.0).Как только вы изменяете атрибут времени (rowtime или proctime), он становится обычным атрибутом TIMESTAMP и теряет свои особые временные характеристики.

Для атрибутов rowtime причина в том, что мы не можем гарантировать, что отметка времени все еще совпадает с водяными знаками.В принципе, мы могли бы задержать водяные знаки на вычитаемый интервал времени, но это пока не поддерживается.

...