Я делаю запрос со следующим кодом:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Row> ds = SourceHelp.builder().env(env).consumer010(MyKafka.builder().build().kafkaWithWaterMark2())
.rowTypeInfo(MyRowType.builder().build().typeInfo())
.build().source4();
//,proctime.proctime,rowtime.rowtime
String sql1 = "select a,b,max(rowtime)as rowtime from user_device group by a,b";
DataStream<Row> ds2 = TableHelp.builder().tableEnv(tableEnv).tableName("user_device").fields("a,b,rowtime.rowtime")
.rowTypeInfo(MyRowType.builder().build().typeInfo13())
.sql(sql1).in(ds).build().result();
ds2.print();
// String sql2 = "select a,count(b) as b from user_device2 group by a";
String sql2 = "select a,count(b) as b,HOP_END(rowtime,INTERVAL '5' SECOND,INTERVAL '30' SECOND) as c from user_device2 group by HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '30' SECOND),a";
DataStream<Row> ds3 = TableHelp.builder().tableEnv(tableEnv).tableName("user_device2").fields("a,b,rowtime.rowtime")
.rowTypeInfo(MyRowType.builder().build().typeInfo14())
.sql(sql2).in(ds2).build().result();
ds3.print();
env.execute("test");
примечание: для sql1 я использую функцию max с rowtime, она не работает, и выдается следующее исключение:
Исключение в теме "главная"
org.apache.flink.runtime.client.JobExecutionException:
java.lang.RuntimeException: временная метка Rowtime является нулевой. Пожалуйста, сделай
убедитесь, что правильный TimestampAssigner определен и поток
среда использует временную характеристику EventTime. в
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking (MiniCluster.java:625)
в
org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute (LocalStreamEnvironment.java:123)
в
com.aicaigroup.water.WaterTest.testRowtimeWithMoreSqls5 (WaterTest.java:158)
at com.aicaigroup.water.WaterTest.main (WaterTest.java:20). Причина:
java.lang.RuntimeException: временная метка Rowtime является нулевой. Пожалуйста, сделай
убедитесь, что правильный TimestampAssigner определен и поток
среда использует временную характеристику EventTime. в
DataStreamSourceConversion $ 24.processElement (Неизвестный источник) в
org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement (CRowOutputProcessRunner.scala: 67)
в
org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66)
в
org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.pushToOperator (OperatorChain.java:558)
в
org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:533)
в
org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:513)
в
org.apache.flink.streaming.runtime.tasks.OperatorChain $ BroadcastingOutputCollector.collect (OperatorChain.java:628)
в
org.apache.flink.streaming.runtime.tasks.OperatorChain $ BroadcastingOutputCollector.collect (OperatorChain.java:581)
в
org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:679)
в
org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:657)
в
org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51)
в com.aicaigroup.TableHelp $ 1.processElement (TableHelp.java:42) в
com.aicaigroup.TableHelp $ 1.processElement (TableHelp.java:39) в
org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66)
в
org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.pushToOperator (OperatorChain.java:558)
в
org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:533)
в
org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:513)
в
org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:679)
в
org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:657)
в
org.apache.flink.streaming.api.operators.StreamMap.processElement (StreamMap.java:41)
в
org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.pushToOperator (OperatorChain.java:558)
в
org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:533)
в
org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:513)
в
org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:679)
в
org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:657)
в
org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51)в org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement (GroupAggProcessFunction.scala: 151) в org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement (GroupAggProcessFunction.scgap. 39 at).flink.streaming.api.operators..runtime.tasks.OneInputStreamTask.run (OneInputStreamTask.java:104) при org.apache.flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask.java:306) при org.apache.flink.runtime.taskmanager.Task.run (Task.java:703) на java.lang.Thread.run (Thread.java:748) 2018-09-17 09: 51: 53.679 [Сборщик Кафки 0.10 для источника: Пользовательский источник -> Карта -> из:(a, b, время строки) -> выберите: (a, b, CAST (время строки) AS время строки) (2/8)] INFO oakafka.clients.consumer.internals.AbstractCoordinator - обнаруженный координатор 172.16.11.91:9092 (id: 2147483647 rack: null) для группового теста.
, затем я попытался обновить sql1 следующим образом: «выберите a, b, rowtime from user_device», и это работает.Так как исправить ошибку?Первый sql должен использовать group by, а второй sql должен использовать rowtime by timeWindow.3QS