Допустим, у меня есть следующая модель
public class Model {
private int ts;
private String dataField1;
private String dataField2;
private String dataField3;
// possibly many more, they might change in the future
}
У меня есть код Flink, который может преобразовывать DataStream[String]
в DataStream[Model]
. Теперь я хотел бы продолжить обработку этих данных, используя FlinkSQL
. Допустим, я хочу получить такой запрос
SELECT COUNT(DISTINCT dataField1) FROM [tableName]
GROUP BY dataField2, TUMBLE(ts, INTERVAL '15' MINUTE)
Конечно, это не сработает, поскольку Flink требует, чтобы ts
было TIMESTAMP
, а в настоящее время это BIGINT
. Поэтому мы должны каким-то образом проинструктировать Flink использовать здесь время события (средство извлечения водяных знаков уже является частью существующего DataStream
). Я придумал следующее
Table inputTable = tableEnv.fromDataStream(inputStream, "ts.rowtime, dataField1, dataField2, ..."
Это работает нормально, но я действительно не хочу повторять здесь каждое поле POJO, просто чтобы отметка времени была помечена как rowtime
. Я думаю, что можно добавить еще один шаг + ввести новый POJO, который преобразовал бы мой Model
в
public class TableModel {
private Timestamp ts;
private String dataField1;
private String dataField2;
private String dataField3;
// ...
}
Но мне это тоже не нравится ... Есть ли более чистый способ преобразования BIGINT
в TIMESTAMP
без такого шаблона?