FLINK - как обработать logi c по результату запроса sql - PullRequest
0 голосов
/ 04 августа 2020

Мое требование - обработать или построить несколько c logi * вокруг результата запроса sql во flink. Для простоты скажем, у меня есть два запроса sql, которые они выполняют с разным размером окна и один поток событий. Мой вопрос:

  • a) как я узнаю, для какого результата запроса это
  • b) как я узнаю, сколько строки являются результатом выполненного запроса? Мне нужна эта информация, так как мне нужно создать уведомление со списком событий, которые являются частью результата запроса.
DataStream<Event> ds = ...        
String query = "select id, key" +
                "  from  eventTable  GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key ";

        String query1 = "select id, key" +
                "  from  eventTable  GROUP BY TUMBLE(rowTime, INTERVAL '1' DAY), id, key ";
        List<String> list = new ArrayList<>();
        list.add(query);
        list.add(query1);
       
        tabEnv.createTemporaryView("eventTable", ds, $("id"), $("timeLong"), $("key"),$("rowTime").rowtime());


        for(int i =0; i< list.size(); i++ ){
            Table result = tabEnv.sqlQuery(list.get(i));
            DataStream<Tuple2<Boolean, Row>> dsRow = tabEnv.toRetractStream(result, Row.class);
            dsRow.process(new ProcessFunction<Tuple2<Boolean, Row>, Object>() {

            List<Row> listRow = new ArrayList<>();
            @Override
            public void processElement(Tuple2<Boolean, Row> booleanRowTuple2, Context context, Collector<Object> collector) throws Exception {
                listRow.add(booleanRowTuple2.f1);
            }
            });
        }

Благодарю за вашу помощь. спасибо Ашуто sh

1 Ответ

1 голос
/ 05 августа 2020

Чтобы отсортировать результаты по какому запросу, вы можете включить идентификатор для каждого запроса в сами запросы, например,

SELECT '10sec', id, key FROM eventTable GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key

Определить количество строк в таблице результатов сложнее. Одна из проблем заключается в том, что нет окончательного ответа на количество результатов потокового запроса. Но там, где вы обрабатываете результаты, кажется, что вы можете подсчитать количество строк.

Или, и я не пробовал это, но, возможно, вы могли бы использовать что-то вроде row_number() over(order by tumble_rowtime(rowTime, interval '10' second)) для аннотации каждой строки результат со счетчиком.

...