Обработка преобразования в цикле, обновление многих списков до CompletableFuture - PullRequest
1 голос
/ 20 июня 2019

У меня есть список строк и для каждого элемента в списке (который в основном является именем файла), я должен сделать некоторую обработку. У меня работает последовательный процесс, но я хотел бы преобразовать его в CompleatableFuture, где мне нужно обновить много списков. Как я могу это сделать?

Это мой оригинальный код

Path fileLocation = findLocationForFiles(fileDTSuffix);
Map<String, Map<String, String>> csvMap = getCsvToMap("filename", fileLocation);
List<String> filesMissingCSV = new ArrayList<>();
List<String> expiredAssets = new ArrayList<>();

int filesUploaded = 0;
int filesProcessed = 0;
//TODO : Convert below to use CompletableFuture to perform the actions in thread

for (String fileName : filesToProcessList) {
    if(filesProcessed > 100){
        System.out.println("Processed 100 files - calling cleanupTempDir ..... ");
        cleanupTempDir(fileLocation);
        filesProcessed = 0;
    }
    Path path = Paths.get(fileName);

    File file = new File(fileName);
    String baseFileName = file.getName();
    String translatedNameFromVW = translateFilenameFromVW(baseFileName);
    String translatedNameFromVWSansConfig = translateFilenameFromVWSansConfig(baseFileName);
    String translatedNameToVW = translateFilenameToVW(baseFileName);
    boolean isFilenameEqFromVW = baseFileName.equalsIgnoreCase(translatedNameFromVW);
    boolean isFilenameEqToVW = baseFileName.equalsIgnoreCase(translatedNameToVW);
    Map<String, String> csvFileInfo = getCSVMapAttributes(csvMap, baseFileName, translatedNameFromVW, translatedNameFromVWSansConfig, translatedNameToVW);

    if (null == csvFileInfo) {
        filesMissingCSV.add(fileName + " | " + file.getName());
    } else {
        if (!isAssetExpired(csvFileInfo)) {
            AssetMaster am = generateJSON(fileName, csvFileInfo);
            addToCassandra(path, am, translatedNameToVW);
            printJson(am, fileLocation);
            filesUploaded++;
            filesProcessed++;
        } else {
            expiredAssets.add(fileName + " | " + file.getName());
        }
    }
}

Я пытаюсь преобразовать его в CompletableFutures, как показано ниже

List<CompletableFuture> asyncThreadsList = new ArrayList();

for (String fileName : filesToProcessList) {
    asyncThreadsList.add(CompletableFuture.runAsync(() -> {
        try {
            processEachFile(csvMap, fileName, fileLocation);
        } catch (IOException ex) {
            Logger.getLogger(UploadThreaded.class.getName()).log(Level.SEVERE, null, ex);
        }
    }));
}
CompletableFuture.allOf(asyncThreadsList.toArray(new CompletableFuture[0]))
   .thenRunAsync(() -> System.out.println("Ended doing things"));

И мое определение processEachFile(csvMap, fileName, fileLocation) такое, как показано ниже

private static CompletableFuture<Void> processEachFile(Map<String, Map<String, String>> csvMap, String fileName, Path fileLocation) throws IOException {

    if (filesProcessed > 100) {
        System.out.println("Processed 100 files - calling cleanupTempDir ..... ");
        cleanupTempDir(fileLocation);
        filesProcessed = 0;
    }
    Path path = Paths.get(fileName);

    File file = new File(fileName);
    String baseFileName = file.getName();
    String translatedNameFromVW = translateFilenameFromVW(baseFileName);
    String translatedNameFromVWSansConfig = translateFilenameFromVWSansConfig(baseFileName);
    String translatedNameToVW = translateFilenameToVW(baseFileName);
    boolean isFilenameEqFromVW = baseFileName.equalsIgnoreCase(translatedNameFromVW);
    boolean isFilenameEqToVW = baseFileName.equalsIgnoreCase(translatedNameToVW);
    Map<String, String> csvFileInfo = getCSVMapAttributes(csvMap, baseFileName, translatedNameFromVW, translatedNameFromVWSansConfig, translatedNameToVW);

    if (null == csvFileInfo) {
        filesMissingCSV.add(fileName + " | " + file.getName());
    } else {
        if (!isAssetExpired(csvFileInfo)) {
            AssetMaster am = generateJSON(fileName, csvFileInfo);
            addToCassandra(path, am, translatedNameToVW);
            printJson(am, fileLocation);
            filesUploaded++;
            filesProcessed++;
        } else {
            expiredAssets.add(fileName + " | " + file.getName());
        }
    }
    return null;
}

Моя проблема в том, что мне нужно объединять эти списки из каждого прогона: filesMissingCSV, expiredAssets, а также обновлять счетчики filesUploaded и filesProcessed из каждого прогона, чтобы получить окончательные значения. И мне трудно понять, как этого достичь.

Возможно ли это сделать и как этого достичь?

Спасибо

1 Ответ

1 голос
/ 21 июня 2019

@ adbdkb прокомментировал У меня фактически есть внешний цикл для каталогов, и этот цикл для файлов в каждом каталоге. Используя тот же механизм, который вы показали здесь, могу ли я создать список CompletableFutures для внешнего цикла?

, поэтому я решил предоставить простое решение, охватывающее только основные проблемы ОП.

Сначала я создал класс для обработки результата для каждого файла.

class ProcessDetail {
  private int fileUploaded;
  private int fileProcessed;
  private List<String> expireFile = new ArrayList<>();
  private List<String> missingFile = new ArrayList<>();
}

также есть несколько способов упростить задачу. Первый метод processEachDir. Я просто передаю список строк в качестве аргумента, который представляет файлы каталога. этот метод возвращает List<CompletableFuture<ProcessDetail>>.

private static List<CompletableFuture<ProcessDetail>> processEachDir(List<String> dir) {
    System.out.println(dir + " time: " + LocalDateTime.now());
    sleep(1);
    return dir.stream()
            .map(fileName -> processEachFile(fileName))
            .collect(Collectors.toList());
} 

Второй метод - processEachFile, который получает строковый параметр, представляющий имя файла. этот метод возвращает CompletableFuture<ProcessDetail>

private static CompletableFuture<ProcessDetail> processEachFile(String fileName) {
    System.out.println(fileName + " time: " + LocalDateTime.now());

    return CompletableFuture.supplyAsync(() -> {
        ProcessDetail processDetail = new ProcessDetail();
        if (csvFileInfo()) {
            processDetail.getMissingFile().add(fileName);
        } else if (!isAssetExpired()) {
            processDetail.setFileProcessed(1);
            processDetail.setFileUploaded(1);
        } else {
            processDetail.getExpireFile().add(fileName);
        }
        return processDetail;
    });
}

сейчас время для выполнения задачи: Для каждого каталога ниже приведен список CompletableFuture<ProcessDetail>, файлы процессов и другие материалы. и поместите его результат в карту с именем каталога в качестве ключа. processDetails.put(dir.getKey(), result);

directories.entrySet().forEach(dir -> {
        List<CompletableFuture<ProcessDetail>> list = processEachDir(dir.getValue());
        CompletableFuture.allOf(list.toArray(new CompletableFuture<?>[0]))
                .thenAccept(nothing -> {
                    ProcessDetail result = list.stream()
                            .map(CompletableFuture::join).reduce(new ProcessDetail(), merge);
                    processDetails.put(dir.getKey(), result);

                });
});

DEMO

...