В Flink Process файлы один за другим и обновляют общий объект Java после каждой обработки файла - PullRequest
0 голосов
/ 07 июля 2019

Мне нужно получить данные из нескольких файлов данных и создать одну запись в файле-приемнике, в основном статическую java.util.List.

Если я указываю env.execute после каждого набора данных, java-объект обновляется после каждого файлового процесса

Если я, наконец, укажу env.execute, обработка файла будет происходить параллельно, и я, наконец, получу неправильные манипуляции с данными.

Есть предложения?

 try {
        scanDirectory();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        for(int i =0;i<fileArray.length;i++) {
            DataStream<String> text = env.readTextFile(fileArray[i].getPath());
            System.out.println(fileArray[i].getPath());
            if(fileArray[i].getName().contains("post")) {
                DataStream<String> DataStream = text.flatMap(new WirlsChargesPostBillMapper());
            //  DataStream.writeAsText("C:\\Users\\workspace\\POC\\resources\\results.txt");
            //  env.execute("First FLink POC");
            }

            if(fileArray[i].getName().contains("pre")) {
                DataStream<String> DataStream2 = text.flatMap(new WirlsChargesPreBillMapper());
            //  DataStream2.writeAsText("C:\\Users\\workspace\\POC\\resources\\results1.txt");
            //  env.execute("First FLink POC");
            }
            //  DataStream<String> keysStream = DataStream.forward();
        //  keysStream.writeUsingOutputFormat(new JedisOutputFormat(null));
        //  logger.info("final class"+LineDumpList.get(0).getBanNumber());
            env.execute("First FLink POC");
        }`
...