Асинхронный метод с последующим параллельно выполняемым методом в Java 8 - PullRequest
0 голосов
/ 07 мая 2018

Потратив день на изучение API java Concurrency, я все еще не совсем понял, как создать следующие функции с помощью классов CompletableFuture и ExecutorService:

Когда я получаю запрос на мою конечную точку REST, мне нужно:

  1. Запустите асинхронную задачу (включая запрос к базе данных, фильтрацию и т. Д.), Которая в конце выдаст мне список строковых URL-адресов
  2. В то же время ответьте обратно вызывающему REST с помощью HTTP OK, запрос получен, я работаю над ним
  3. Когда асинхронная задача завершена, мне нужно отправить HTTP-запросы (с полезной нагрузкой, которую мне дал вызывающий REST) ​​на URL-адреса, полученные с работы. Максимальное количество URL-адресов будет около 100, поэтому мне нужно, чтобы это происходило параллельно.
  4. В идеале у меня есть некоторый синхронизированный счетчик, который подсчитывает, сколько http-запросов было успешным / неудачным, и я могу отправить эту информацию обратно вызывающей стороне REST (URL-адрес, на который мне нужно отправить ее обратно, предоставляется внутри полезной нагрузки запроса). ).

У меня есть строительные блоки (методы, такие как: getMatchingObjectsFromDB (callerPayload), getURLs (resultOfgetMachingObjects), sendHttpRequest (Url, methodType) и т. Д.), Написанные для них, я просто не могу понять, как связать шаг 1 и шаг 3 вместе. Я бы использовал CompletableFuture.supplyAsync() для шага 1, а затем мне понадобился бы метод CompletableFuture.thenComponse для запуска шага 3, но мне не ясно, как можно реализовать параллелизм с этим API. Это довольно интуитивно понятно с ExecutorService executor = Executors.newWorkStealingPool();, который создает пул потоков на основе того, сколько вычислительной мощности доступно, и задачи можно отправлять с помощью метода invokeAll().

Как я могу использовать CompletableFuture и ExecutorService вместе? Или как я могу гарантировать параллельное выполнение списка задач с CompletableFuture? Демонстрация фрагмента кода будет высоко ценится. Спасибо.

Ответы [ 2 ]

0 голосов
/ 10 мая 2018

Для завершения приведем соответствующие части кода после очистки и тестирования (спасибо Mạnh Quyết Nguyễn):

Класс контроллера покоя:

@POST
@Path("publish")
public Response publishEvent(PublishEvent eventPublished) {
    /*
        Payload verification, etc.
    */

    //First send the event to the right subscribers, then send the resulting hashmap<String url, Boolean subscriberGotTheRequest> back to the publisher
    CompletableFuture.supplyAsync(() -> EventHandlerService.propagateEvent(eventPublished)).thenAccept(map -> {
      if (eventPublished.getDeliveryCompleteUri() != null) {
        String callbackUrl = Utility
            .getUri(eventPublished.getSource().getAddress(), eventPublished.getSource().getPort(), eventPublished.getDeliveryCompleteUri(), isSecure,
                    false);
        try {
          Utility.sendRequest(callbackUrl, "POST", map);
        } catch (RuntimeException e) {
          log.error("Callback after event publishing failed at: " + callbackUrl);
          e.printStackTrace();
        }
      }
    });

    //return OK while the event publishing happens in async
    return Response.status(Status.OK).build();
}

Класс обслуживания:

private static List<EventFilter> getMatchingEventFilters(PublishEvent pe) {
    //query the database, filter the results based on the method argument
}

private static boolean sendRequest(String url, Event event) {
    //send the HTTP request to the given URL, with the given Event payload, return true if the response is positive (status code starts with 2), false otherwise
}

static Map<String, Boolean> propagateEvent(PublishEvent eventPublished) {
    // Get the event relevant filters from the DB
    List<EventFilter> filters = getMatchingEventFilters(eventPublished);
    // Create the URLs from the filters
    List<String> urls = new ArrayList<>();
    for (EventFilter filter : filters) {
      String url;
      try {
        boolean isSecure = filter.getConsumer().getAuthenticationInfo() != null;
        url = Utility.getUri(filter.getConsumer().getAddress(), filter.getPort(), filter.getNotifyUri(), isSecure, false);
      } catch (ArrowheadException | NullPointerException e) {
        e.printStackTrace();
        continue;
      }
      urls.add(url);
    }

    Map<String, Boolean> result = new ConcurrentHashMap<>();
    Stream<CompletableFuture> stream = urls.stream().map(url -> CompletableFuture.supplyAsync(() -> sendRequest(url, eventPublished.getEvent()))
                                                                                 .thenAcceptAsync(published -> result.put(url, published)));
    CompletableFuture.allOf(stream.toArray(CompletableFuture[]::new)).join();
    log.info("Event published to " + urls.size() + " subscribers.");
    return result;
}

Отладка это была немного сложнее, чем обычно, иногда код просто волшебным образом останавливался. Чтобы исправить это, я поместил только части кода в асинхронную задачу, которая была абсолютно необходима, и я убедился, что код в задаче использует поточно-ориентированные вещи. Также я сначала был тупым, и мои методы внутри EventHandlerService.class использовали ключевое слово synchronized, в результате чего CompletableFuture внутри метода класса Service не выполнялся, поскольку по умолчанию он использует пул потоков.

Часть логики, помеченная как synchronized, становится синхронизированным блоком, позволяя выполнять только один поток в любой момент времени.

0 голосов
/ 07 мая 2018

Вы должны использовать join(), чтобы дождаться окончания всей нити.

Создать Map<String, Boolean> result, чтобы сохранить результат запроса.

В вашем контроллере:

public void yourControllerMethod() {

  CompletableFuture.runAsync(() -> yourServiceMethod());
}

К вашим услугам:

// Execute your logic to get List<String> urls

List<CompletableFuture> futures = urls.stream().map(v -> 
 CompletableFuture.supplyAsync(url -> requestUrl(url))
           .thenAcceptAsync(requestResult -> result.put(url, true or false))
).collect(toList()); // You have list of completeable future here

Затем используйте .join() для ожидания всех потоков (помните, что ваша служба уже выполняется в своем собственном потоке)

CompletableFuture.allOf(futures).join();

Затем вы можете определить, какой из них успешен / неудач, открыв result map

Редактировать

Пожалуйста, опубликуйте код процедуры, чтобы другие могли вас понять.

Я прочитал ваш код и вот необходимые изменения:

Когда этот цикл for не был закомментирован, веб-сервер получателя получил один и тот же запрос дважды, Я не понимаю цель этого цикла.

Извините в моем предыдущем ответе, я не убрал его. Это просто временная идея, которую я забыл удалить в конце: D

Просто удалите его из кода

// allOf () принимает только массивы, поэтому список необходимо преобразовать / * Код никогда не преодолеет эту часть (я знаю, что allOf () является блокирующим вызовом), даже спустя много времени после того, как получатель получил HTTP-запрос

   with the correct payload. I'm not sure yet where exactly the code gets stuck */

Ваша карта должна быть ConcurrentHashMap, потому что вы изменяете ее одновременно позже.

Map<String, Boolean> result = new ConcurrentHashMap<>();

Если ваш код все еще не работает должным образом, я предлагаю удалить часть parallelStream().

CompletableFuture и parallelStream используют общий пул разветвлений. Я думаю, что бассейн исчерпан.

И вы должны создать свой собственный пул для CompletableFuture:

Executor pool = Executors.newFixedThreadPool(10);

И выполнить ваш запрос, используя этот пул:

CompletableFuture.supplyAsync(YOURTASK, pool).thenAcceptAsync(Yourtask, pool)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...