Я хочу использовать Session
агрегирование окон, а затем запустить Tumble
агрегацию окон поверх полученного результата в Table API/Flink SQL
.
Можно ли изменить атрибут rowtime
после первого session
агрегирование, чтобы оно равнялось .rowtime
последнего наблюдаемого события в сеансе?
Я пытаюсь сделать что-то вроде этого:
table
.window(Session withGap 2.minutes on 'rowtime as 'w)
.groupBy('w, 'userId)
.select(
'userId,
('w.end.cast(Types.LONG) - 'w.start.cast(Types.LONG)) as 'sessionDuration,
('w.rowtime - 2.minutes) as 'rowtime
)
.window(Tumble over 5.minutes on 'rowtime as 'w)
.groupBy('w)
.select(
'w.start,
'w.end,
'sessionDuration.avg as 'avgSession,
'sessionDuration.count as 'numberOfSession
)
Ключевая часть:
('w.rowtime - 2.minutes) as 'rowtime
Поэтому я хочу переназначить записи .rowtime
самого последнего события в сеансе, без перерыва в сеансе (2.minutes
в этом примере).
Этоотлично работает в BatchTable, однако не работает в StreamTable:
Exception in thread "main" org.apache.flink.table.api.ValidationException: TumblingGroupWindow('w, 'rowtime, 300000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.
Да, я знаю, мне кажется, что я не хочу изобретать машину времени и менять порядок времени.Но реально ли как-то достичь описанного поведения?