Наличие эквивалента HOP_START внутри примитива агрегации в Flink - PullRequest
0 голосов
/ 13 февраля 2019

Я пытаюсь сделать экспоненциально убывающую скользящую среднюю по скользящему окну в Flink SQL.Мне нужен доступ к одной из границ окна, HOP_START в следующем:

SELECT                                                                              
  lb_index one_key,
-- I have access to this one:
  HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,
-- Aggregation primitive:
  SUM(
    Y * EXP(TIMESTAMPDIFF(
      SECOND, 
      proctime, 
-- This one throws:
      HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
  )))
FROM write_position                                                                
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)

Я получаю следующую трассировку стека:

11:55:37.011 [main] DEBUG o.a.c.p.RelOptPlanner - For final plan, using Aggregate(groupBy: (lb_index), window: (SlidingGroupWindow('w$, 'proctime, 5000.millis, 50.millis)), select: (lb_index, SUM($f2) AS Y, start('w$) AS w$start, end('w$) AS w$end, proctime('w$) AS w$proctime))
11:55:37.011 [main] DEBUG o.a.c.p.RelOptPlanner - For final plan, using Calc(select: (lb_index, proctime, *(payload.Y, EXP(/(CAST(/INT(Reinterpret(-(HOP_START(PROCTIME(proctime), 50, 5000), PROCTIME(proctime))), 1000)), 1000))) AS $f2))
11:55:37.011 [main] DEBUG o.a.c.p.RelOptPlanner - For final plan, using rel#459:DataStreamScan.DATASTREAM.true.Acc(table=[_DataStreamTable_0])
Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Unsupported call: HOP_START 
If you think this function should be supported, you can create an issue and start a discussion for it.
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)

Это говоритявляется ли он нереализованным, пока он работает вне агрегирующей суммы.Это то, что заставляет меня думать, что это проблема с областью видимости.

Теперь дело в том, что я могу преобразовать это выражение и выполнить окончательную обработку вне агрегации, так как exp (x + y) = exp (x)* ехр (у);Но я застрял с использованием TIMESTAMPDIFF (, который сделал чудеса в моем предыдущем выпуске ).Я не нашел способа приведения АТРИБУТОВ ВРЕМЕНИ к ЧИСЛОВЫМ типам;Кроме того, мне неудобно ставить отметки времени в UNIX, даже если я уменьшу их.

В любом случае, этот обходной путь может быть неуклюжим и может привести меня к другому пути.Я не знаю, как я мог бы втиснуть области в этом фрагменте SQL, чтобы они все еще находились в области видимости окна и имели время начала без выброса.

1 Ответ

0 голосов
/ 16 февраля 2019

Я предлагаю вам поэкспериментировать с HOP_PROCTIME (), а не с HOP_START ().Различия объяснены здесь , но эффект будет в том, что у вас будет атрибут proctime, а не метка времени, что, я надеюсь, порадует TIMESTAMPDIFF.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...