Apache Flink - CoFlatMapFunction не обрабатывает события в порядке времени событий - PullRequest
0 голосов
/ 25 мая 2018

Я настроил обработку времени события и у меня есть связанный поток с CoFlatMapFunction.Я пишу тестовый пример, но вижу, что FlatMapFunction не вызывает его методы flatMap1() и flatMap2() с событиями в порядке событий и времени.

Какой-то псевдокод для уточнения

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

env.setParallelism(1)

DataStream<Integer> evenStream = env.addSource(new SourceFunction<Integer>(){
         public void run(SourceContext<Integer> ctxt){
                for (i=0; i < 20; i=i+2){
                     ctxt.collectWithTimestamp(i, i);
                     ctxt.emitWatermark(i);
                } 
         }
 }
 )
DataStream<Integer> oddStream  = env.addSource(new SourceFunction<Integer>(){
         public void run(SourceContext<Integer> ctxt){
                for (i=1; i < 21; i=i+2){
                     ctxt.collectWithTimestamp(i, i); // Using i as timestamp and watermark for this sample code, but in real code, I am using using timestamp of real event 
                     ctxt.emitWatermark(i);
                } 
         }
 }
 )

evenStream
   .connect(oddStream)
   .flatMap(new CoFlatMapFunction<Integer, Integer, Integer>(){

        public void flatMap1(Integer evenNumber, Collector<Integer> out){                  
               System.out.println(evenNumber);
        }
        public void flatMap2(Integer oddNumber, Collector<Integer> out){
               System.out.println(oddNumber);
        }

   }
   );

Когда я запускаю это, я ожидаю, что он напечатает:

0,1,2,3,4 .... 21

Это потому, чтоЯ устанавливаю метку времени четных и нечетных чисел поочередноДругими словами, 0 имеет самую низкую отметку времени, затем 1, затем 2 и т. Д.

Но сначала печатаются все четные числа, за которыми следуют нечетные числа.

Итак, я ожидал, что flatMap1() и flatMap2 () должны вызываться в порядке временных меток, которые я установил в событиях.Но этого не происходит.

1 Ответ

0 голосов
/ 26 мая 2018

Функции Flink (функции на связанных потоках) не дают гарантий, в каком порядке вызываются их методы.Методы (например, flatMap1() и flatMap2()) вызываются всякий раз, когда событие доступно из любого входа.В вашем примере объем данных, которые генерирует четный источник, слишком мал, так что все данные уже обрабатываются при поступлении нечетных чисел.

Итак, как работает обработка времени события для совместных функций?

Водяные знаки совместной функции всегда являются минимальными водяными знаками обоих входов.Для CoFlatMapFunction это не очень важно, потому что вы не можете ни прочитать текущий водяной знак, ни временные метки записей.Однако, с CoProcessFunction вы имеете доступ к обоим и можете регистрировать таймеры, которые вызываются, когда водяной знак достигает определенного момента времени.Если вы хотите отсортировать исходящий поток по времени события, вам нужно буферизовать входящие события (в состоянии), и, когда водяной знак прогрессирует, вы можете создавать все записи по порядку до времени, превышающего водяной знак.

...