Таблица потоковой передачи Flink с использованием источника kafka и использованием flink sql для запроса - PullRequest
0 голосов
/ 17 июня 2020

Я пытаюсь прочитать данные из kafka topi c в DataStream и зарегистрировать DataStream , после этого используйте TableEnvironment.sqlQuery ("SQL") для запроса данных, когда TableEnvironment.execute () нет ошибки и нет вывода.

public static void main(String[] args){
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
   env.setStreamTimeCharacteristic(TimeCharateristic.EventTime);
   env.enableCheckpointing(5000);
   StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
   FlinkKafkaConsumer<Person> consumer = new FlinkKafkaConsumer(
                                                                "topic",
                                                                 new JSONDeserializer(),
                                                                 Job.getKafkaProperties
                                                               );

  consumer.setStartFromEarliest();
  DataStream<Person> stream = env.addSource(consumer).fliter(x -> x.status != -1).assignTimestampAndWatermarks(new AssignerWithPeriodicWatermarks<Person>(){
      long current = 0L;
      final long expire = 1000L;

      @Override
      public Watermakr getCurrentWatermark(){
         return new Watermark(current - expire);
      }

      @Override
      public long extractTimestamp(Person person){
         long timestamp = person.createTime;
         current = Math.max(timestamp,current);
         return timestamp;
      }
  });
  //set createTime as rowtime
  tableEnvironment.registerDataStream("Table_Person",stream,"name,age,sex,createTime.rowtime");
  Table res = tableEnvironment.sqlQuery("select TUMBLE_END(createTime,INTERVAL '1' minute) as registTime,sex,count(1) as total from Table_Person group by sex,TUMBLE(createTime,INTERVAL '1' minute)");
  tableEnvironment.toAppendStream(t,Types.Row(new TypeInformation[]{Types.SQL_TIMESTAMP,Types.STRING,Types.LONG})).print();
tableEnvironment.execute("person-query");
}

когда я выполняю, на консоли ничего не печаталось или не было исключения; но если я использую fromCollection () в качестве источника, программа напечатает что-нибудь на консоли; Не могли бы вы посоветовать мне исправить это?

зависимости:

  1. flink-streaming-java_2.11 версия: 1.9.0-csa1.0.0.0;
  2. версия flink-streaming-scala_2.11: 1.9.0-csa1.0.0.0;
  3. версия flink-connector-kafka_2.11: 1.9.0-csa1.0.0.0;
  4. flink-table-api- java -bridge_2.11 версия: 1.9.0-csa1.0.0.0;
  5. flink-table-planner_2.11 версия: 1.9.0-csa1.0.0.0 ;

1 Ответ

0 голосов
/ 17 июня 2020

В коде, где вы конвертируете результат запроса SQL обратно в DataStream, вам нужно передать res, а не t в toAppendStream. (Я не вижу, как код, который вы опубликовали, будет даже компилироваться - где объявлено t?) И я думаю, вы сможете сделать это

Table res = tableEnvironment.sqlQuery("select TUMBLE_END(createTime,INTERVAL '1' minute) as registTime,sex,count(1) as total from Table_Person group by sex,TUMBLE(createTime,INTERVAL '1' minute)");
tableEnvironment.toAppendStream(res,Row.class).print();

вместо того, чтобы возиться с TypeInformation.

...