Я пытаюсь применить очень простую оконную функцию к конечному потоку данных в Apache Flink ( локально, без кластера ).Вот пример:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(ProcessingTimeTrigger.create)
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
out.collect(elements.toList.sorted.toString())
}
})
.print()
env.execute()
Здесь я пытаюсь сгруппировать все элементы, которые поступают в окно в течение секунды, а затем просто распечатать эти группы.
Я предполагал, что все элементы будут произведены намного меньше чем за одну секунду и попадут в одно окно, поэтому в print()
будет один входящий элемент.Тем не менее, вообще ничего не печатается , когда я запускаю это.
Если я удаляю все элементы управления окнами, например
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.print()
, я вижу элементы, напечатанные после запуска,Я также попробовал это с источником файла, без разницы.
Параллелизм по умолчанию на моей машине - 6. Если я экспериментирую с уровнем параллелизма и задержками, как это
val env = StreamExecutionEnvironment.createLocalEnvironment(2)
env
.fromCollection(List("a", "b", "c", "d", "e"))
.map { x => Thread.sleep(1500); x }
Iвозможность вывести некоторые - не все - элементы в группы, которые выводятся на печать.
Мое первое предположение состоит в том, что источник завершает работу намного быстрее, чем через 1 секунду, и задача закрывается до срабатывания таймера окна.Отладка показала, что достигнута строка установки таймера в ProcessingTimeTrigger
.Не должны ли все запущенные таймеры завершиться до завершения задачи (по крайней мере, такое впечатление я получил от кода )?
Не могли бы вы помочь мне понять это и сделать это более детерминированным?
Обновление № 1, 23.09.2018:
Я также экспериментировал с временными окнами событий вместо временных окон обработки.Если я сделаю это:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env
.fromCollection(List("a", "b", "c", "d", "e"))
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[String] {
override def extractAscendingTimestamp(element: String): Long = {
element.charAt(0).toInt
}
})
.windowAll(TumblingEventTimeWindows.of(Time.seconds(1)))
.trigger(EventTimeTrigger.create)
.process(new ProcessAllWindowFunction[String, String, TimeWindow] {
override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = {
out.collect(elements.toList.toString())
}
})
.print()
env.execute()
Тогда снова ничего не печатается.Отладчик показывает, что триггер onElement
вызывается для каждого элемента, но onEventTime
никогда не вызывается.
Кроме того, если я изменю средство извлечения метки времени, чтобы сделать более крупные шаги:
element.charAt(0).toInt * 1000
печатаются все элементы (по одному элементу на группу, что ожидается), кроме последнего.
Обновление № 2, 23.09.2018:
ОбновлениеОтвет # 1 в этом комментарии .