Apache Flink: как обрабатывать три потока - PullRequest
0 голосов
/ 08 ноября 2018

Я хочу получать и обрабатывать три потока в одном операторе. Например, код, реализованный в Storm , выглядит следующим образом:

builder.setBolt("C_bolt", C_bolt(), parallelism_hint) .fieldsGrouping("A_bolt", "TRAINING", new Fields("word")) .fieldsGrouping("B_bolt", "ANALYSIS", new Fields("word")) .allGrouping("A_bolt", "SUM");

В Flink реализована обработка SUM stream(A_bolt's SideOutput) и TRAINING stream(A_bolt):

SingleOutputStreamOperator<Tuple3<String, Integer, Boolean>> A_bolt;
DataStream<Tuple2<Integer, Integer>> Sum = A_bolt.getSideOutput(outputTag).broadcast();
DataStream<Tuple3<String, String, Integer>> B_bolt;
DataStream<String> C_bolt= A_bolt
                        .keyBy(new KeySelector<Tuple3<String,Integer,Boolean>, String>() {
                                    @Override
                                    public String getKey(Tuple3<String,Integer,Boolean> in) throws Exception {
                                        return in.f0;
                                    }
                                })
                        .connect(Sum)
                        .flatMap(new Process())
                        .setParallelism(parallelism);

Но я не знаю, как добавить ANALYSIS stream(B_bolt). Спасибо за вашу помощь.

1 Ответ

0 голосов
/ 08 ноября 2018

Flink поддерживает только потоковые операторы с одним входом и двумя входами. Ваши варианты:

  1. Используйте union () , чтобы создать объединенный поток, содержащий все элементы из всех трех потоков (которые все должны быть одного типа, хотя вы можете использовать Either, чтобы помочь с этим).
  2. После использования coFlatMap для объединения двух потоков подключите этот предварительный результат к третьему потоку, используя другой coFlatMap (или функцию coProcessFunction) для завершения обработки.

Или, возможно, комбинация этих двух методов предпочтительнее в вашем случае.

...