Я настроил обработку времени события и у меня есть связанный поток с CoFlatMapFunction
.Я пишу тестовый пример, но вижу, что FlatMapFunction
не вызывает его методы flatMap1()
и flatMap2()
с событиями в порядке событий и времени.
Какой-то псевдокод для уточнения
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
DataStream<Integer> evenStream = env.addSource(new SourceFunction<Integer>(){
public void run(SourceContext<Integer> ctxt){
for (i=0; i < 20; i=i+2){
ctxt.collectWithTimestamp(i, i);
ctxt.emitWatermark(i);
}
}
}
)
DataStream<Integer> oddStream = env.addSource(new SourceFunction<Integer>(){
public void run(SourceContext<Integer> ctxt){
for (i=1; i < 21; i=i+2){
ctxt.collectWithTimestamp(i, i); // Using i as timestamp and watermark for this sample code, but in real code, I am using using timestamp of real event
ctxt.emitWatermark(i);
}
}
}
)
evenStream
.connect(oddStream)
.flatMap(new CoFlatMapFunction<Integer, Integer, Integer>(){
public void flatMap1(Integer evenNumber, Collector<Integer> out){
System.out.println(evenNumber);
}
public void flatMap2(Integer oddNumber, Collector<Integer> out){
System.out.println(oddNumber);
}
}
);
Когда я запускаю это, я ожидаю, что он напечатает:
0,1,2,3,4 .... 21
Это потому, чтоЯ устанавливаю метку времени четных и нечетных чисел поочередноДругими словами, 0 имеет самую низкую отметку времени, затем 1, затем 2 и т. Д.
Но сначала печатаются все четные числа, за которыми следуют нечетные числа.
Итак, я ожидал, что flatMap1() и flatMap2 () должны вызываться в порядке временных меток, которые я установил в событиях.Но этого не происходит.