Конвертируйте Apache Flink Datastream в поток данных, который делает падающие окна из 2 событий и суммирует значение - PullRequest
0 голосов
/ 21 октября 2019

У меня есть таблица Flink со следующими столбцами: final String[] hNames = {"mID", "dateTime", "mValue", "unixDateTime", "mType"}; Я хочу создать DataStream в Apache Flink, который делает падающие окна длиной 2 каждого и вычисляет среднее значение mValue для этого окна. Ниже я использовал функцию SUM, поскольку кажется, что функции AVG нет. Эти окна должны быть сгруппированы по столбцу mID (является целым числом) или dateTime. Я ставлю окна в столбец mType, поскольку они представляют собой определенную группу данных.

Другая проблема, с которой я сталкиваюсь, заключается в том, что данные, которые я использую в этом приложении, взяты из файла CSV. Так что это не данные в реальном времени. Проблема в том, что Flink случайным образом упорядочивает эти данные. Я хочу, чтобы он сортировался по возрастанию в столбце mID или dateTime.

Приведенный ниже код ничего не печатает. Что я здесь не так делаю? Странная вещь, когда я заменяю countWindow() функцию на countWindowAll(), тогда я получаю вывод.

final String[] hColumnNames = {"mID", "dateTime", "mValue", "unixDateTime", "mType"};

  StreamExecutionEnvironment.getExecutionEnvironment();
 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv);   


        TableSource csvSource = CsvTableSource.builder()
                .path("path")
                .fieldDelimiter(";")
                .field(hColumnNames[0], Types.INT())
                .field(hColumnNames[1], Types.SQL_TIMESTAMP())
                .field(hColumnNames[2], Types.DOUBLE())
                .field(hColumnNames[3], Types.LONG())
                .field(hColumnNames[4], Types.STRING())
                .build();

        //Register the TableSource 
        tableEnv.registerTableSource("H", csvSource);
        Table HTable = tableEnv.scan("H");
        tableEnv.registerTable("HTable", HTable);

        DataStream<Row> stream = tableEnv.toAppendStream(HTable, Row.class);

        TupleTypeInfo<Tuple5<Integer, Timestamp, Double, Long, String>> tupleType = new TupleTypeInfo<>(
        Types.INT(),
        Types.SQL_TIMESTAMP(),
        Types.DOUBLE(),
        Types.LONG(),
        Types.STRING());
        DataStream<Tuple5<Integer, Timestamp, Double, Long, String>> dsTuple =
                tableEnv.toAppendStream(HTable, tupleType);


//What is going wrong below???
        DataStream<Tuple5<Integer, Timestamp, Double, Long, String>> dsTuple1 = dsTuple
                .keyBy(4)
                .countWindow(2)
                .sum(3)
                ;

        try {
            fsEnv.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
...