Цепочка двух CompletableFutures, каждый из которых возвращает списки - PullRequest
0 голосов
/ 03 декабря 2018

Мне трудно разобраться в этом, и я могу получить помощь от тех, кто более опытен и осведомлен, чем я.

Основная проблема заключается в том, что мне нужно получить Список объектов, а затемдля каждого возвращенного объекта извлеките некоторые детали и вставьте детали в объект.Я хотел бы быть эффективным в этом;Сначала мне нужно получить список файлов данных, но, получив его, я могу сделать вызовы, чтобы получить их теги одновременно, а затем дождаться возвращения всех ответов getTags, прежде чем их обработать.

public class DataFile {
    // DataFileDao returns all DataFile properties, except Tags
    private List<Tags> tags;
    ...
}

Я просто не могу понять, как сделать это функционально, используя CompletableFutures и потоки.Вот основной код, который я использую, хотя, и если бы кто-то мог помочь мне добраться до финиша, я был бы очень признателен:

public CompletableFuture<List<DataFile>> getDataFilesWithTags() {

    final CompletableFuture<List<DataFile>> dataFileFutures = this.dataFileDao.getDataFiles()
        .thenApply(HttpResponse::body).thenApply(this::toDataFileList);

    final CompletableFuture<List<List<Tag>>> tagFutures = dataFileFutures
        .thenCompose(dataFiles -> HttpUtils.allAsList(dataFiles.stream()
            .map(file -> this.tagDao.getLatestTagsForDataFile(file.getId())).collect(toList())));

    final CompletableFuture<List<DataFile>> filesWithTags = dataFileFutures.thenCombine(tagFutures,
        (files, tags) -> {
            for (int i = 0; i < files.size(); i++) {
                files.get(i).setTags(tags.get(i));
            }

            return files;
        });

    return fileWithTags;
}

/**
 * Transforms a generic {@link List} of {@link CompletableFuture}s to a {@link CompletableFuture} containing a
 * generic {@link List}.
 *
 * @param futures the {@code List} of {@code CompletableFuture}s to transform
 * @param         <T> the type of {@link CompletableFuture} to be applied to the {@link List}
 * @return a {@code CompletableFuture} containing a {@code List}
 * @throws NullPointerException if {@code futures} is null
 */
public static <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
    Validate.notNull(futures, "futures cannot be null");
    return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
            .thenApply(ignored -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()));
}

Должен быть более чистый и более функциональный способ сделать это, верно?

Абстрактное представление того, что я хотел бы сделать:

public class ParentObject {

    // RETURNED BY ParentObjectDao.getAllParentObjects()
    private String id;

    // *NOT* RETURNED BY ParentObjectDao.getAllParentObjects()
    // MUST BE RETRIEVED BY MAKING SUPPLEMENTAL CALL TO ParentObjectDao.getParentsChildren(String parentObjectId)
    private List<ChildObject> details;
}

public class ChildObject {

    private String id;
    private String description;
}

public class ParentObjectDao {

    public CompletableFuture<List<ParentObject>> getAllParentObjects();

    public CompletableFuture<List<ChildObject>> getChildrenForParent(String parentObjectId);
}

public class Something {

    private final ParentObjectDao dao;

    public List<ParentObject> getParentObjectsWithChildren(){

        // PSEUDO-LOGIC
        final List<ParentObject> parentsWithChildren = dao.getAllParentObjects()
            .thenApply(List::stream)
            .thenCompose(parentObject -> dao.getChildrenForParent(parentObject.getId()))
            .thenApply(parentObject::setChildren)
            .collect(toList);

        return parentsWithChildren;
    }
}

1 Ответ

0 голосов
/ 03 декабря 2018

Код, который у вас есть, не сильно распараллеливает.Вы обрабатываете только один CompletableFuture за раз, связывая операции с ним.Таким образом, если у вас есть 1000 файлов данных, они все равно будут обрабатываться последовательно.

Кроме того, с точки зрения дизайна и читаемости, CompletableFuture работает на слишком низком уровне (вам действительно нужно связать thenApply(HttpResponse::body).thenApply(this::toDataFileList), если вы не можете правильно инкапсулировать преобразование и иметьCompletableFuture просто представляет один метод?)

Используя ваш псевдокод, что примерно так:

CompletableFuture<List<ParentObject>> populateAsync(List<ParentObject> parents) {

  //get the children of each parent in parallel, store the futures in a list
  List<CompletableFuture<ParentObject>> futures = 
    parents.stream() 
           .map(parent ->
                   parentObjectDao.getChildrenForParent(parent.getId())
                                  .thenApply(parent::setChildren))  //assuming setChildren returns the parent object
           .collect(Collectors.toList()); //we need this stream terminal operation to start all futures before we join on the first one

  //wait for all of them to finish and then put the result in a list
  return CompletableFuture.supplyAsync(() -> 
                                     futures.stream()
                                            .map(CompletableFuture::join)
                                            .collect(Collectors.toList());    
}

Тогда вы сможете сделать что-то вроде этого:

CompletableFuture<List<ParentObject>> getAllParentObjects()
          .thenApply(this::populateAsync)

(у меня могут быть некоторые синтаксические ошибки, потому что я просто написал это прямо здесь, но вы должны понять).

...