Мне нужно получить данные из нескольких файлов данных и создать одну запись в файле-приемнике, в основном статическую java.util.List
.
Если я указываю env.execute
после каждого набора данных, java-объект обновляется после каждого файлового процесса
Если я, наконец, укажу env.execute
, обработка файла будет происходить параллельно, и я, наконец, получу неправильные манипуляции с данными.
Есть предложения?
try {
scanDirectory();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
for(int i =0;i<fileArray.length;i++) {
DataStream<String> text = env.readTextFile(fileArray[i].getPath());
System.out.println(fileArray[i].getPath());
if(fileArray[i].getName().contains("post")) {
DataStream<String> DataStream = text.flatMap(new WirlsChargesPostBillMapper());
// DataStream.writeAsText("C:\\Users\\workspace\\POC\\resources\\results.txt");
// env.execute("First FLink POC");
}
if(fileArray[i].getName().contains("pre")) {
DataStream<String> DataStream2 = text.flatMap(new WirlsChargesPreBillMapper());
// DataStream2.writeAsText("C:\\Users\\workspace\\POC\\resources\\results1.txt");
// env.execute("First FLink POC");
}
// DataStream<String> keysStream = DataStream.forward();
// keysStream.writeUsingOutputFormat(new JedisOutputFormat(null));
// logger.info("final class"+LineDumpList.get(0).getBanNumber());
env.execute("First FLink POC");
}`