Можно ли динамически добавлять новые CompletableFutures в CompletableFuture.allOf ()? - PullRequest
0 голосов
/ 24 сентября 2019

У меня есть база данных с 3 таблицами:

Таблица A, содержащая данные объектов A Таблица B, содержащая данные объектов B Таблица C, которая содержит данные объектов C

Объекты могутиметь 0 или 1 B объектов B объектов может иметь 0 или 1 объектов C (я знаю, они могут быть только в одной таблице, но это только для примера)

Я хочу сделать CSV-файл из всегобаза данных: каждая строка должна содержать ровно один объект A, необязательно свой объект B и, необязательно, свой объект C.

Для каждой таблицы существует асинхронное хранилище, которое возвращает CompletionStage.Поэтому, когда я выбираю объекты A из хранилища A, я получаю обратно CompletionStage<List<A>>.Когда он завершается, я делаю Map для каждого объекта A, заполняю его данными A и вызываю repositoryB.getB(A.id), который возвращает CompletionStage<Optional<B>>.Если значение B отсутствует, я добавляю новую строку в мой CSV-файл с данными внутри карты.Если B присутствует, я добавляю его значения на карту и вызываю repositoryC.getC(B.id), который возвращает CompletionStage<Optional<C>>.Если C присутствует, я добавляю его значения на карту и добавляю новую строку в файл CSV, если нет, тогда я просто добавляю новую строку.

Создание CSV завершено, когда все CompletionStages завершены.Я пытался использовать CompletableFuture.allOf (), но так как в начале я не знаю, сколько будет CompletionStage, я не могу добавить их все в метод allOf, поэтому я думаю, что мне нужно каким-то образом динамически добавлять Completionstages.Возможно ли это?

В настоящее время у меня есть рабочее решение, но оно блокируется после каждой выборки B и C, поэтому я хочу сделать весь код неблокирующим.

Это моя попытка разблокирования, но она не работает должным образом, так как некоторые из фьючерсов B и C не добавлены в список фьючерсов, поэтому код не ожидает их завершения:

CompletableFuture<List<CompletableFuture>> genereteCSV = repositoryA.getAs().thenApplyAsync(listA-> {
            List<CompletableFuture> futures = new ArrayList<>();
            for (A a : listA) {
                Map<String, String> values = new Map<>();
                addAvaluesToMap(values, A);

                CompletableFuture Bfuture = repositoryB.getB(A.id).thenAcceptAsync((optionalB -> {
                    if (optionalB.isPresent()) {
                        addValuesToMap(values, B);

                        CompletableFuture Cfuture = repositoryC.getC(B.id).thenAcceptAsync(optionalC-> {
                            if (optionalC.isPresent()) {
                                addAvaluesToMap(values, C);
                            } 
                            addMapValuesToCSV(values);
                        });
                        futures.add(Cfuture);

                    } else {
                        addMapValuesToCSV(values);
                    }
                }));

                futures.add(Bfuture);
            }
            return futures;
        });

        geerateCSV.thenApplyAsync(futureList-> CompletableFuture.allOf(futureList.toArray(new CompletableFuture<?>[0])))
        .thenAccept(dummy->{System.out.println("CsV generation done");});

Ответы [ 2 ]

0 голосов
/ 24 сентября 2019

Вы используете реляционную базу данных.Должно быть проще и эффективнее написать запрос к базе данных, чтобы вернуть нужные вам данные в нужном вам формате, чем писать в java.SQL-запрос позволит вам очень легко объединить три таблицы и предоставить результаты в формате, который можно легко извлечь в формате CSV.Базы данных могут выполнять эти операции гораздо эффективнее, чем при написании собственной реализации.

0 голосов
/ 24 сентября 2019

По сути, это один из возможных планов для достижения неблокирующей обработки:

  1. Создать CompletableFuture для каждого объекта A (возможно, заполненного объектами B и C)

  2. Асинхронный сбор объектов A из CompletableFuture s

  3. Запись созданных объектов A в файл CSV

Примечания:В приведенном ниже примере я использовал addAvaluesToMap и addMapValuesToCSV, предполагая, что они у вас работают.Кроме того, я предполагаю, что использование CompletableFuture s оправдано вашими целями.

Это будет реализация подхода, описанного выше:

public void generateCSV() {
    repositoryA.getAs().thenAccept(listA -> {
        List<CompletableFuture<A>> futures = listA.stream()
                .map(a -> repositoryB.getB(a.id).thenComposeAsync(optionalB ->
                        optionalB.map(b -> repositoryC.getC(b.id).thenComposeAsync(optionalC -> {
                                    a.setB(b);
                                    return optionalC.map(c -> {
                                        b.setC(c);
                                        return CompletableFuture.completedFuture(a);
                                    }).orElse(CompletableFuture.completedFuture(a));
                                })
                        ).orElse(CompletableFuture.completedFuture(a)))
                ).collect(Collectors.toList());

        CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
                .thenAccept(v -> futures.stream()
                        .map(CompletableFuture::join)
                        .forEach(a -> {
                            Map<String, String> values = new HashMap<>();
                            addAvaluesToMap(values, a);
                            addMapValuesToCSV(values);
                        })
                )
                .exceptionally(throwable -> {
                    System.out.println("Failed generating CSV. Error: " + throwable);
                    return null;
                });
    }).exceptionally(throwable -> {
        System.out.println("Failed to get list of As. Error: " + throwable);
        return null;
    });
}
...