Мое требование - обработать или построить несколько c logi * вокруг результата запроса sql во flink. Для простоты скажем, у меня есть два запроса sql, которые они выполняют с разным размером окна и один поток событий. Мой вопрос:
- a) как я узнаю, для какого результата запроса это
- b) как я узнаю, сколько строки являются результатом выполненного запроса? Мне нужна эта информация, так как мне нужно создать уведомление со списком событий, которые являются частью результата запроса.
DataStream<Event> ds = ...
String query = "select id, key" +
" from eventTable GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key ";
String query1 = "select id, key" +
" from eventTable GROUP BY TUMBLE(rowTime, INTERVAL '1' DAY), id, key ";
List<String> list = new ArrayList<>();
list.add(query);
list.add(query1);
tabEnv.createTemporaryView("eventTable", ds, $("id"), $("timeLong"), $("key"),$("rowTime").rowtime());
for(int i =0; i< list.size(); i++ ){
Table result = tabEnv.sqlQuery(list.get(i));
DataStream<Tuple2<Boolean, Row>> dsRow = tabEnv.toRetractStream(result, Row.class);
dsRow.process(new ProcessFunction<Tuple2<Boolean, Row>, Object>() {
List<Row> listRow = new ArrayList<>();
@Override
public void processElement(Tuple2<Boolean, Row> booleanRowTuple2, Context context, Collector<Object> collector) throws Exception {
listRow.add(booleanRowTuple2.f1);
}
});
}
Благодарю за вашу помощь. спасибо Ашуто sh