Флинк, несколько временных окон? - PullRequest
0 голосов
/ 19 марта 2019

Мы читаем данные из kafka, сообщение можно упростить до Tuple2, здесь String - это ключ, а Integer - это тип (может быть 1, 2, 3), что-то вроде

('key001', 1)
('key001', 2)
('key001', 3)
('key001', 3)
('key002', 1)
('key002', 2)
('key003', 1)
('key004', 1)

Мы хотим получить некоторую статистическую информацию, за 10 минутное окно,

  • подсчитать ключи, имеющие тип 1,
  • подсчитать ключи, которые имеют как тип 1, так и тип 2,
  • считайте ключи, у которых есть все 3 типа

Я попробовал приведенный ниже код, кажется, работает, но выглядело извращенным для меня, это правильный путь?

Я должен использовать здесь временное окно дважды, так как использование одного временного окна не дает того, что я хочу, я до сих пор не понимаю, как оно работает, может кто-нибудь объяснить, что происходит, если применить несколько временных оконв поток?

SingleOutputStreamOperator<Tuple2<String, Long>> x = ds.keyBy(0)
        .timeWindow(Time.seconds(600))
        .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Long>, Tuple, TimeWindow>() {

            @Override
            public void process(Tuple key,
                    ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Long>, Tuple, TimeWindow>.Context ctx,
                    Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Long>> out) throws Exception {

                boolean hasType1 = false;
                boolean hasType2 = false;
                boolean hasType3 = false;
                for (Tuple2<String, Integer> t2 : elements) {
                    if (t2.f1 == 1) {
                        if (!hasType1) {
                            hasType1 = true;
                        }
                    } else if (t2.f1 == 2) {
                        if (!hasType2) {
                            hasType2 = true;
                        }
                    } else if (t2.f1 == 3) {
                        if (!hasType3) {
                            hasType3 = true;
                        }
                    }
                    //
                    if (hasType1 && hasType2 && hasType3) {
                        break;
                    }
                }
                if (hasType1) {
                    out.collect(new Tuple2<>("hasType1",1L));
                    if (hasType2) {
                        out.collect(new Tuple2<>("hasType1_Type2",1L));
                        if (hasType3) {
                            out.collect(new Tuple2<>("hasType1_Type2_Type3",1L));
                        }
                    }
                }

            }

        });

x.keyBy(0).timeWindow(Time.seconds(600)).sum(1).map(new MapFunction<Tuple2<String, Long>, String>(){

    @Override
    public String map(Tuple2<String, Long> value) throws Exception {
        return value.f0 + " = " + value.f1;
    }

}).addSink(new BucketingSink<String>("hdfs://...")).setParallelism(1);
...