Преобразование DataStream в таблицу + атрибут времени события отметки - PullRequest
0 голосов
/ 25 мая 2020

Допустим, у меня есть следующая модель

public class Model {
  private int ts;

  private String dataField1;
  private String dataField2;
  private String dataField3;
  // possibly many more, they might change in the future
}

У меня есть код Flink, который может преобразовывать DataStream[String] в DataStream[Model]. Теперь я хотел бы продолжить обработку этих данных, используя FlinkSQL. Допустим, я хочу получить такой запрос

SELECT COUNT(DISTINCT dataField1) FROM [tableName]
  GROUP BY dataField2, TUMBLE(ts, INTERVAL '15' MINUTE)

Конечно, это не сработает, поскольку Flink требует, чтобы ts было TIMESTAMP, а в настоящее время это BIGINT. Поэтому мы должны каким-то образом проинструктировать Flink использовать здесь время события (средство извлечения водяных знаков уже является частью существующего DataStream). Я придумал следующее

Table inputTable = tableEnv.fromDataStream(inputStream, "ts.rowtime, dataField1, dataField2, ..."

Это работает нормально, но я действительно не хочу повторять здесь каждое поле POJO, просто чтобы отметка времени была помечена как rowtime. Я думаю, что можно добавить еще один шаг + ввести новый POJO, который преобразовал бы мой Model в

public class TableModel {
  private Timestamp ts;

  private String dataField1;
  private String dataField2;
  private String dataField3;
  // ...
}

Но мне это тоже не нравится ... Есть ли более чистый способ преобразования BIGINT в TIMESTAMP без такого шаблона?

...