Я хочу получать и обрабатывать три потока в одном операторе. Например, код, реализованный в Storm , выглядит следующим образом:
builder.setBolt("C_bolt", C_bolt(), parallelism_hint)
.fieldsGrouping("A_bolt", "TRAINING", new Fields("word"))
.fieldsGrouping("B_bolt", "ANALYSIS", new Fields("word"))
.allGrouping("A_bolt", "SUM");
В Flink реализована обработка SUM stream(A_bolt's SideOutput)
и TRAINING stream(A_bolt)
:
SingleOutputStreamOperator<Tuple3<String, Integer, Boolean>> A_bolt;
DataStream<Tuple2<Integer, Integer>> Sum = A_bolt.getSideOutput(outputTag).broadcast();
DataStream<Tuple3<String, String, Integer>> B_bolt;
DataStream<String> C_bolt= A_bolt
.keyBy(new KeySelector<Tuple3<String,Integer,Boolean>, String>() {
@Override
public String getKey(Tuple3<String,Integer,Boolean> in) throws Exception {
return in.f0;
}
})
.connect(Sum)
.flatMap(new Process())
.setParallelism(parallelism);
Но я не знаю, как добавить ANALYSIS stream(B_bolt)
. Спасибо за вашу помощь.