Временные окна обработки не работают на конечных источниках данных в Apache Flink - PullRequest
0 голосов
/ 23 сентября 2018

Я пытаюсь применить очень простую оконную функцию к конечному потоку данных в Apache Flink ( локально, без кластера ).Вот пример:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))

  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .trigger(ProcessingTimeTrigger.create)
  .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.sorted.toString())
    }
  })

  .print()

env.execute()

Здесь я пытаюсь сгруппировать все элементы, которые поступают в окно в течение секунды, а затем просто распечатать эти группы.

Я предполагал, что все элементы будут произведены намного меньше чем за одну секунду и попадут в одно окно, поэтому в print() будет один входящий элемент.Тем не менее, вообще ничего не печатается , когда я запускаю это.

Если я удаляю все элементы управления окнами, например

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .print()

, я вижу элементы, напечатанные после запуска,Я также попробовал это с источником файла, без разницы.

Параллелизм по умолчанию на моей машине - 6. Если я экспериментирую с уровнем параллелизма и задержками, как это

val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .map { x => Thread.sleep(1500); x }

Iвозможность вывести некоторые - не все - элементы в группы, которые выводятся на печать.

Мое первое предположение состоит в том, что источник завершает работу намного быстрее, чем через 1 секунду, и задача закрывается до срабатывания таймера окна.Отладка показала, что достигнута строка установки таймера в ProcessingTimeTrigger.Не должны ли все запущенные таймеры завершиться до завершения задачи (по крайней мере, такое впечатление я получил от кода )?

Не могли бы вы помочь мне понять это и сделать это более детерминированным?

Обновление № 1, 23.09.2018:

Я также экспериментировал с временными окнами событий вместо временных окон обработки.Если я сделаю это:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env
  .fromCollection(List("a", "b", "c", "d", "e"))
  .assignTimestampsAndWatermarks(new AscendingTimestampExtractor[String] {
    override def extractAscendingTimestamp(element: String): Long = {
      element.charAt(0).toInt
    }
  })

  .windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
  .trigger(EventTimeTrigger.create)
  .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.toString())
    }
  })

  .print()

env.execute()

Тогда снова ничего не печатается.Отладчик показывает, что триггер onElement вызывается для каждого элемента, но onEventTime никогда не вызывается.

Кроме того, если я изменю средство извлечения метки времени, чтобы сделать более крупные шаги:

element.charAt(0).toInt * 1000

печатаются все элементы (по одному элементу на группу, что ожидается), кроме последнего.

Обновление № 2, 23.09.2018:

ОбновлениеОтвет # 1 в этом комментарии .

1 Ответ

0 голосов
/ 23 сентября 2018

Когда конечный источник достигает конца, если вы используете время события, то будет введен водяной знак с меткой времени Long.MAX_VALUE, что вызовет срабатывание всех таймеров времени события.Однако со временем обработки Flink будет ждать, пока все таймеры запуска завершат свои действия, и затем выйдет.

Как вы и подозревали, вы не видите никаких выходных данных, потому что источник завершается очень быстро.

Детерминированное поведение является прямым с обработкой времени события;со временем обработки это не реально достижимо.

Но вот хак, который более или менее работает:

val env = StreamExecutionEnvironment.getExecutionEnvironment

val s = env.fromCollection(List("a", "b", "c", "d", "e"))
val t = env.addSource((context: SourceContext[String]) => {
  while(true) {
    Thread.sleep(100)
    context.collect("dummy")
  }
})

s.union(t)
  .filter(_ != "dummy")
  .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
  .process(new ProcessAllWindowFunction[String, String, TimeWindow] {
    override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
      out.collect(elements.toList.sorted.toString())
    }
  })
  .print()

env.execute()
...