Теперь у нас есть SQL с причудливым управлением окнами во Flink, я пытаюсь, чтобы затухающая скользящая средняя называлась «что будет возможно в будущих выпусках Flink как для Table API, так и для SQL».из их дорожной карты SQL / превью 2017-03 пост :
table
.window(Slide over 1.hour every 1.second as 'w)
.groupBy('productId, 'w)
.select(
'w.end,
'productId,
('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)
Вот моя попытка (также вдохновленная примером распада кальцита ):
SELECT
lb_index one_key,
HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,
SUM(Y *
EXP(
proctime -
HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
))
FROM write_position
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
Время - это время обработки, которое мы получаем как proctime с созданием записи write_position из таблицы AppendStream следующим образом:
tEnv.registerTable(
"write_position",
tEnv.fromDataStream(appendStream, "lb_index, Y, proctime.proctime"))
Я получаю эту ошибку:
Cannot apply '-' to arguments of type '<TIME ATTRIBUTE(PROCTIME)> - <TIME ATTRIBUTE(PROCTIME)>'.
Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>'
Я пробовал приводить proctime ко всем известным мне типам (в попытке достичь ЦУМНОЙ земли обетованной), и я просто не могу найти, как заставить это работать.
Я что-то упустил?Является ли proctime каким-то особым видом времени изменения системы, которое вы не можете преобразовать?Если так, то все равно должен быть какой-то способ сравнить его со значением HOP_START (proctime, ...).