У меня есть таблица Flink со следующими столбцами: final String[] hNames = {"mID", "dateTime", "mValue", "unixDateTime", "mType"};
Я хочу создать DataStream в Apache Flink, который делает падающие окна длиной 2 каждого и вычисляет среднее значение mValue
для этого окна. Ниже я использовал функцию SUM
, поскольку кажется, что функции AVG
нет. Эти окна должны быть сгруппированы по столбцу mID
(является целым числом) или dateTime
. Я ставлю окна в столбец mType
, поскольку они представляют собой определенную группу данных.
Другая проблема, с которой я сталкиваюсь, заключается в том, что данные, которые я использую в этом приложении, взяты из файла CSV. Так что это не данные в реальном времени. Проблема в том, что Flink случайным образом упорядочивает эти данные. Я хочу, чтобы он сортировался по возрастанию в столбце mID или dateTime.
Приведенный ниже код ничего не печатает. Что я здесь не так делаю? Странная вещь, когда я заменяю countWindow()
функцию на countWindowAll()
, тогда я получаю вывод.
final String[] hColumnNames = {"mID", "dateTime", "mValue", "unixDateTime", "mType"};
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv);
TableSource csvSource = CsvTableSource.builder()
.path("path")
.fieldDelimiter(";")
.field(hColumnNames[0], Types.INT())
.field(hColumnNames[1], Types.SQL_TIMESTAMP())
.field(hColumnNames[2], Types.DOUBLE())
.field(hColumnNames[3], Types.LONG())
.field(hColumnNames[4], Types.STRING())
.build();
//Register the TableSource
tableEnv.registerTableSource("H", csvSource);
Table HTable = tableEnv.scan("H");
tableEnv.registerTable("HTable", HTable);
DataStream<Row> stream = tableEnv.toAppendStream(HTable, Row.class);
TupleTypeInfo<Tuple5<Integer, Timestamp, Double, Long, String>> tupleType = new TupleTypeInfo<>(
Types.INT(),
Types.SQL_TIMESTAMP(),
Types.DOUBLE(),
Types.LONG(),
Types.STRING());
DataStream<Tuple5<Integer, Timestamp, Double, Long, String>> dsTuple =
tableEnv.toAppendStream(HTable, tupleType);
//What is going wrong below???
DataStream<Tuple5<Integer, Timestamp, Double, Long, String>> dsTuple1 = dsTuple
.keyBy(4)
.countWindow(2)
.sum(3)
;
try {
fsEnv.execute();
} catch (Exception e) {
e.printStackTrace();
}