Apache Flink два типа окон, времени и "счетчика" окон - PullRequest
0 голосов
/ 23 июня 2018

Я пытаюсь работать с файлом как поток с окном.

Вот код

object Prog {

  def main(args: Array[String]) : Unit = {
    org.apache.log4j.BasicConfigurator.configure()

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    val csvTableSource = CsvTableSource
      .builder
      .path("src/main/resources/data.stream")
      .field("numPers", Types.INT)
      .field("TIMESTAMP", Types.STRING)
      .fieldDelimiter(",")
      .ignoreFirstLine
      .ignoreParseErrors
      .commentPrefix("%")
      .build()

    tableEnv.registerTableSource("Data", csvTableSource)

    val table = tableEnv.scan("Data")
      .filter("numPers > 10")
      .select("*")

    val ds = tableEnv.toAppendStream(table, classOf[Row])

    ds.print()
    env.execute()
  }
}

Вопрос в том, как реализовать здесь окно, например, чтобы показывать только значения, которые не старше одного часа. Или второй тип окна, когда я читаю, скажем, 50 последних записей.

1 Ответ

0 голосов
/ 26 июня 2018

В потоковой обработке окна - это группы, по которым вычисляются агрегаты.

Ваш вариант использования, кажется, отличается.Если вы хотите сохранить последние x минуты или y последние записи, то это нужно по-разному выразить в SQL.

Сохранение последних 5 минут будет выглядеть примерно так:

SELECT * FROM Data d WHERE d.tstamp >  (now() - INTERVAL '5' MINUTE)

Итак, это будет фильтр для какого-либо атрибута отметки времени.

Сохранение последних 10 строк будет

SELECT * FROM Data d ORDER BY d.tstamp DESC LIMIT 10

Однако ,Flink (версия 1.5) SQL или Table API не поддерживает ни одну из этих операций.

...