Я пытаюсь прочитать данные из kafka topi c в DataStream и зарегистрировать DataStream , после этого используйте TableEnvironment.sqlQuery ("SQL") для запроса данных, когда TableEnvironment.execute () нет ошибки и нет вывода.
public static void main(String[] args){
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharateristic.EventTime);
env.enableCheckpointing(5000);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
FlinkKafkaConsumer<Person> consumer = new FlinkKafkaConsumer(
"topic",
new JSONDeserializer(),
Job.getKafkaProperties
);
consumer.setStartFromEarliest();
DataStream<Person> stream = env.addSource(consumer).fliter(x -> x.status != -1).assignTimestampAndWatermarks(new AssignerWithPeriodicWatermarks<Person>(){
long current = 0L;
final long expire = 1000L;
@Override
public Watermakr getCurrentWatermark(){
return new Watermark(current - expire);
}
@Override
public long extractTimestamp(Person person){
long timestamp = person.createTime;
current = Math.max(timestamp,current);
return timestamp;
}
});
//set createTime as rowtime
tableEnvironment.registerDataStream("Table_Person",stream,"name,age,sex,createTime.rowtime");
Table res = tableEnvironment.sqlQuery("select TUMBLE_END(createTime,INTERVAL '1' minute) as registTime,sex,count(1) as total from Table_Person group by sex,TUMBLE(createTime,INTERVAL '1' minute)");
tableEnvironment.toAppendStream(t,Types.Row(new TypeInformation[]{Types.SQL_TIMESTAMP,Types.STRING,Types.LONG})).print();
tableEnvironment.execute("person-query");
}
когда я выполняю, на консоли ничего не печаталось или не было исключения; но если я использую fromCollection () в качестве источника, программа напечатает что-нибудь на консоли; Не могли бы вы посоветовать мне исправить это?
зависимости:
- flink-streaming-java_2.11 версия: 1.9.0-csa1.0.0.0;
- версия flink-streaming-scala_2.11: 1.9.0-csa1.0.0.0;
- версия flink-connector-kafka_2.11: 1.9.0-csa1.0.0.0;
- flink-table-api- java -bridge_2.11 версия: 1.9.0-csa1.0.0.0;
- flink-table-planner_2.11 версия: 1.9.0-csa1.0.0.0 ;