Выход Flink Side для скользящего временного окна - PullRequest
0 голосов
/ 05 сентября 2018

У меня есть следующий конвейер Flink, который просто считает элементы в окне и сообщает в отдельном потоке поздние элементы

OutputTag<Tuple3<Long, String, Double>> lateItems= new OutputTag<Tuple3<Long, String, Double>>("Late Items"){};
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

env.setParallelism(1);
env.getConfig().setAutoWatermarkInterval(-1);
DataStream<Tuple3<Long,String,Double>> stream = env.addSource(new  YetAnotherSource(fileName));
DataStream<Tuple3<Long,String,Double>> lateStream;
AllWindowedStream<Tuple3<Long, String, Double>, TimeWindow> tuple3TimeWindowAllWindowedStream = stream.windowAll(SlidingEventTimeWindows.of(Time.milliseconds(100), Time.milliseconds(10)));
tuple3TimeWindowAllWindowedStream.sideOutputLateData(lateItems);
lateStream = streamOfResults.getSideOutput(lateItems);
            lateStream.countWindowAll(1).apply(new CounterFunction22()).writeAsText("FlinkSlidingTimeWindowLateItemsResult.txt",FileSystem.WriteMode.OVERWRITE);
streamOfResults.writeAsText("FlinkSlidingTimeWindowOutputFor" + fileName + ".txt", FileSystem.WriteMode.OVERWRITE);

Когда я передаю в качестве ввода следующие данные

1383451269002,A,22
1383451269006,A,18
1383451269007,A,18
*1383451269010,W,0
1383451269008,A,18
1383451269027,A,20
1383451269028,A,19
1383451269033,A,17
1383451269033,A,17
1383451269030,A,17
*1383451269038,W,0
1383451269008,A,17

Элементы с * являются водяными знаками.

Я получаю, как и ожидалось, в результате первое окно содержит три элемента. Это связано с тем, что элементы в пятой и последней строках считаются опоздавшими для этого окна.

(1383451268910,1383451269010,3)

Однако на боковом выходе ничего не генерируется.

Когда я использую окно сеанса, поздние элементы генерируются на боковом выходе.

Есть идеи, почему ничего не генерируется для скользящего временного окна?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...