Apache flink 1.52 Rowtime отметка времени равна нулю - PullRequest
0 голосов
/ 17 сентября 2018

Я делаю запрос со следующим кодом:

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    DataStream<Row> ds = SourceHelp.builder().env(env).consumer010(MyKafka.builder().build().kafkaWithWaterMark2())
            .rowTypeInfo(MyRowType.builder().build().typeInfo())
            .build().source4();
    //,proctime.proctime,rowtime.rowtime
    String sql1 = "select a,b,max(rowtime)as rowtime from user_device group by a,b";
    DataStream<Row> ds2 = TableHelp.builder().tableEnv(tableEnv).tableName("user_device").fields("a,b,rowtime.rowtime")
            .rowTypeInfo(MyRowType.builder().build().typeInfo13())
            .sql(sql1).in(ds).build().result();

    ds2.print();
    // String sql2 = "select a,count(b) as b from user_device2 group by a";
    String sql2 = "select a,count(b) as b,HOP_END(rowtime,INTERVAL '5' SECOND,INTERVAL '30' SECOND) as c from user_device2 group by HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '30' SECOND),a";
    DataStream<Row> ds3 = TableHelp.builder().tableEnv(tableEnv).tableName("user_device2").fields("a,b,rowtime.rowtime")
            .rowTypeInfo(MyRowType.builder().build().typeInfo14())
            .sql(sql2).in(ds2).build().result();

    ds3.print();
    env.execute("test");

примечание: для sql1 я использую функцию max с rowtime, она не работает, и выдается следующее исключение:

Исключение в теме "главная" org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: временная метка Rowtime является нулевой. Пожалуйста, сделай убедитесь, что правильный TimestampAssigner определен и поток среда использует временную характеристику EventTime. в org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking (MiniCluster.java:625) в org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute (LocalStreamEnvironment.java:123) в com.aicaigroup.water.WaterTest.testRowtimeWithMoreSqls5 (WaterTest.java:158) at com.aicaigroup.water.WaterTest.main (WaterTest.java:20). Причина: java.lang.RuntimeException: временная метка Rowtime является нулевой. Пожалуйста, сделай убедитесь, что правильный TimestampAssigner определен и поток среда использует временную характеристику EventTime. в DataStreamSourceConversion $ 24.processElement (Неизвестный источник) в org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement (CRowOutputProcessRunner.scala: 67) в org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.pushToOperator (OperatorChain.java:558) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:533) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:513) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ BroadcastingOutputCollector.collect (OperatorChain.java:628) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ BroadcastingOutputCollector.collect (OperatorChain.java:581) в org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:679) в org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:657) в org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51) в com.aicaigroup.TableHelp $ 1.processElement (TableHelp.java:42) в com.aicaigroup.TableHelp $ 1.processElement (TableHelp.java:39) в org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.pushToOperator (OperatorChain.java:558) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:533) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:513) в org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:679) в org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:657) в org.apache.flink.streaming.api.operators.StreamMap.processElement (StreamMap.java:41) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.pushToOperator (OperatorChain.java:558) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:533) в org.apache.flink.streaming.runtime.tasks.OperatorChain $ CopyingChainingOutput.collect (OperatorChain.java:513) в org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:679) в org.apache.flink.streaming.api.operators.AbstractStreamOperator $ CountingOutput.collect (AbstractStreamOperator.java:657) в org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51)в org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement (GroupAggProcessFunction.scala: 151) в org.apache.flink.table.runtime.aggregate.GroupAggProcessFunction.processElement (GroupAggProcessFunction.scgap. 39 at).flink.streaming.api.operators..runtime.tasks.OneInputStreamTask.run (OneInputStreamTask.java:104) при org.apache.flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask.java:306) при org.apache.flink.runtime.taskmanager.Task.run (Task.java:703) на java.lang.Thread.run (Thread.java:748) 2018-09-17 09: 51: 53.679 [Сборщик Кафки 0.10 для источника: Пользовательский источник -> Карта -> из:(a, b, время строки) -> выберите: (a, b, CAST (время строки) AS время строки) (2/8)] INFO oakafka.clients.consumer.internals.AbstractCoordinator - обнаруженный координатор 172.16.11.91:9092 (id: 2147483647 rack: null) для группового теста.

, затем я попытался обновить sql1 следующим образом: «выберите a, b, rowtime from user_device», и это работает.Так как исправить ошибку?Первый sql должен использовать group by, а второй sql должен использовать rowtime by timeWindow.3QS

1 Ответ

0 голосов
/ 14 ноября 2018

Я начал вздрагивать с 1.6, встречайте такой же вопрос, как у вас.Решено этими шагами:

  • с использованием assignTimestampsAndWatermarks, просто используйте стандартную и обычную реализацию BoundedOutOfOrdernessTimestampExtractor.Вам нужно написать функцию extractTimestamp, чтобы извлечь значение метки времени и объявить интервал окна в конструкторе.
  • append, proctime.proctime, rowtime.rowtime в конце полей (я использую fromDataStream (Flink 1.6) для преобразованияstream as table)
  • , если вы хотите использовать существующее поле как rowtime.Например, поля источника данных - это «a, clicktime, c», вы можете объявить «a, clicktime.rowtime, c»

Если это поможет вам.

...