Я использую getSideOutput
для создания побочного потока вывода. Наличие элемента в потоке предварительной обработки перед обработкой с помощью getSideOutput, но при вызове метода getSideOutput
ничего не выдается.
код следовать
DataStream<String> asyncTable =
join3
.flatMap(new ExtractList())
.process( // detect code using for test
new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out)
throws Exception {
System.out.println(value); // can detect elements
}
})
.getSideOutput(new OutputTag<>("asyTab", TypeInformation.of(String.class)));
, но при вызове getSideOutput
метода после
DataStream<String> asyncTable =
join3
.flatMap(new ExtractList())
.getSideOutput(new OutputTag<>("asyTab", TypeInformation.of(String.class)))
.process(
new ProcessFunction<String, String>() {
@Override
public void processElement(String value, Context ctx, Collector<String> out)
throws Exception {
System.out.println(value); // nothing detect elements
}
});
ExtractList
следующим образом
import org.apache.flink.util.Collector;
public class ExtractList extends RichFlatMapFunction<NewTableA, String> {
@Override
public void flatMap(NewTableA value, Collector<String> out) throws Exception {
String tableName = "NewTableA";
String primaryKeyName = "PA1";
String primaryValue = value.getPA1().toString();
String result = tableName+":"+primaryKeyName+":"+primaryValue;
//System.out.println(result); // right result output
out.collect(result);
}
}
почему getSideOutput
для создания побочного вывода поток без элементов.