Мы читаем данные из kafka, сообщение можно упростить до Tuple2, здесь String - это ключ, а Integer - это тип (может быть 1, 2, 3), что-то вроде
('key001', 1)
('key001', 2)
('key001', 3)
('key001', 3)
('key002', 1)
('key002', 2)
('key003', 1)
('key004', 1)
Мы хотим получить некоторую статистическую информацию, за 10 минутное окно,
- подсчитать ключи, имеющие тип 1,
- подсчитать ключи, которые имеют как тип 1, так и тип 2,
- считайте ключи, у которых есть все 3 типа
Я попробовал приведенный ниже код, кажется, работает, но выглядело извращенным для меня, это правильный путь?
Я должен использовать здесь временное окно дважды, так как использование одного временного окна не дает того, что я хочу, я до сих пор не понимаю, как оно работает, может кто-нибудь объяснить, что происходит, если применить несколько временных оконв поток?
SingleOutputStreamOperator<Tuple2<String, Long>> x = ds.keyBy(0)
.timeWindow(Time.seconds(600))
.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Long>, Tuple, TimeWindow>() {
@Override
public void process(Tuple key,
ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Long>, Tuple, TimeWindow>.Context ctx,
Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Long>> out) throws Exception {
boolean hasType1 = false;
boolean hasType2 = false;
boolean hasType3 = false;
for (Tuple2<String, Integer> t2 : elements) {
if (t2.f1 == 1) {
if (!hasType1) {
hasType1 = true;
}
} else if (t2.f1 == 2) {
if (!hasType2) {
hasType2 = true;
}
} else if (t2.f1 == 3) {
if (!hasType3) {
hasType3 = true;
}
}
//
if (hasType1 && hasType2 && hasType3) {
break;
}
}
if (hasType1) {
out.collect(new Tuple2<>("hasType1",1L));
if (hasType2) {
out.collect(new Tuple2<>("hasType1_Type2",1L));
if (hasType3) {
out.collect(new Tuple2<>("hasType1_Type2_Type3",1L));
}
}
}
}
});
x.keyBy(0).timeWindow(Time.seconds(600)).sum(1).map(new MapFunction<Tuple2<String, Long>, String>(){
@Override
public String map(Tuple2<String, Long> value) throws Exception {
return value.f0 + " = " + value.f1;
}
}).addSink(new BucketingSink<String>("hdfs://...")).setParallelism(1);