Функция процесса Flink не возвращает данные в Sideoutputstream - PullRequest
0 голосов
/ 25 мая 2020

Я пытаюсь проверить JSONObject с помощью набора правил, если json совпадает с набором правил, он вернет соответствующее правило, а JSONObject, если нет, он вернет JSONObject в Sideoutput, все это обрабатывается в ProcessFuntion, Я получаю основной вывод, но не могу захватить побочный вывод

Поток SideOutput определяется следующим образом

public final static OutputTag<org.json.JSONObject> unMatchedJSONSideOutput = new OutputTag<org.json.JSONObject>(
            "unmatched-side-output") {};

Функция ProcessFunction определяется как ниже

public class RuleFilter extends ProcessFunction<Tuple2<String,org.json.JSONObject>,Tuple2<String,org.json.JSONObject>> {
@Override
    public void processElement(Tuple2<String, org.json.JSONObject> value,
            ProcessFunction<Tuple2<String, org.json.JSONObject>, Tuple2<String, org.json.JSONObject>>.Context ctx,
            Collector<Tuple2<String, org.json.JSONObject>> out) throws Exception {

        if(this.value.matches((value.f1))) {
        out.collect(new Tuple2<String, org.json.JSONObject>(value.f0,value.f1));
        }else {
            ctx.output(RuleMatching.unMatchedJSONSideOutput,value.f1);
        }
    }
}

Я печать основного потока данных, как показано ниже

    DataStream<Tuple2<String, org.json.JSONObject>> matchedJSON =
                            inputSignal.map(new MapFunction<org.json.JSONObject, Tuple2<String, org.json.JSONObject>>() {
                                @Override
                                public Tuple2<String, org.json.JSONObject> map(org.json.JSONObject input) throws Exception {
                                    return new Tuple2<>(value, input);
                                }
                            }).process(new RuleFilter()).print("MatchedJSON=>");

matchedJSON .print("matchedJSON=>");

Я печатаю побочный вывод, как показано ниже

DataStream<org.json.JSONObject> unmatchedJSON =
                        ((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(new MapFunction<Tuple2<String, org.json.JSONObject>, org.json.JSONObject>() {
                            @Override
                            public org.json.JSONObject map(Tuple2<String, org.json.JSONObject> value) throws Exception {
                                return value.f1;
                            }
                        })).getSideOutput(unMatchedJSONSideOutput );

                unmatchedJSON.print("unmatchedJSON=>");

Основной поток печатает вывод, но побочный вывод не печатается из-за недопустимого json пожалуйста, помогите в решении вопрос

1 Ответ

1 голос
/ 26 мая 2020

Проблема здесь:

DataStream<org.json.JSONObject> unmatchedJSON =
    ((SingleOutputStreamOperator<org.json.JSONObject>) matchedJSON.map(...))
    .getSideOutput(unMatchedJSONSideOutput);

Вы должны звонить getSideOutput непосредственно на matchedJSON, а не в результате применения к нему MapFunction. Только ProcessFunction может иметь боковой выход, и он должен исходить непосредственно от ProcessFunction. Вы обманом заставили компилятор принять это, преобразовав выходной поток из карты, но среда выполнения не может сделать с этим ничего значимого.

...