Параллельные HTTP-запросы Java с CompleteableFuture не очень производительны - PullRequest
0 голосов
/ 09 ноября 2018

У меня есть веб-служба, которая выполняет http-вызовы другой службы. Веб-служба разбивает запросы «один ко многим» и пытается сделать параллельные запросы «один к одному». Для тестирования производительности я сохранил пропускную способность на уровне бэкэнда постоянной. Например, мне удалось достичь пропускной способности 1000 рэк / сек с задержкой 99-го процентиля 100 мс. Таким образом, чтобы протестировать параллельные запросы, разбитые на 2 запроса к бэкэнду за каждый запрос к веб-службе, я отправил 500 запросов в секунду, но достиг всего лишь задержки 150 мс с 99-й процентилью. Создаю ли я конфликт между потоками и / или блокирую http-вызовы с помощью следующего кода?

import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

public class Foo {
  private HTTPClient myHTTPClient = new HTTPClient("http://my_host.com");  //java ws rs http client

  private interface Handler<REQ, RES> {
    RES work(REQ req);
  }

  private <REQ, RES> CompletableFuture<RES> getAsync(REQ req, Handler<REQ, RES> handler) {
    CompletableFuture<RES> future = CompletableFuture.supplyAsync(() -> {
      return handler.work(req);
    });

    return future;
  }

  public RouteCostResponse getRouteCost(Point sources, List<Point> destinations) {
    Map<String, Request> requests = new HashMap<>();

    // create request bodies and keep track of request id's
    for (Point destination : destinations) {
      requests.put(destination.getId(), new RouteCostRequest(source, destination))
    }

    //create futures
    ConcurrentMap<String, CompletableFuture<RouteCost>> futures = requests.entrySet().parallelStream()
        .collect(Collectors.toConcurrentMap(
            entry -> entry.getKey(),
            entry -> getAsync(entry.getValue(), route -> myHTTPClient.getRoute(route)))
        ));

    //retrieve results
    ConcurrentMap<String, RouteCost> result = futures.entrySet().parallelStream()
        .collect(Collectors.toConcurrentMap(
            entry -> entry.getKey(),
            entry -> entry.getValue().join()
        ));

    RouteCostResponse response = new RouteCostResponse(result);

    return response;
  }
}

1 Ответ

0 голосов
/ 15 ноября 2018

В следующем коде нет противоречий между потоками, хотя кажется, что я столкнулся с проблемами ввода-вывода. Ключ должен использовать явный пул потоков. ForkJoinPool или Executors.fixedThreadPool

import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;

public class Foo {
  private HTTPClient myHTTPClient = new HTTPClient("http://my_host.com");  //java ws rs http client
  private static final ForkJoinPool pool = new ForkJoinPool(1000);

  private interface Handler<REQ, RES> {
    RES work(REQ req);
  }

  private <REQ, RES> CompletableFuture<RES> getAsync(REQ req, Handler<REQ, RES> handler) {
    CompletableFuture<RES> future = CompletableFuture.supplyAsync(() -> {
      return handler.work(req);
    });

    return future;
  }

  public RouteCostResponse getRouteCost(Point sources, List<Point> destinations) {
    Map<String, Request> requests = new HashMap<>();

// create request bodies and keep track of request id's
    for (Point destination : destinations) {
      requests.put(destination.getId(), new RouteCostRequest(source, destination))
    }

    //create futures
    ConcurrentMap<String, CompletableFuture<RouteCost>> futures = requests.entrySet().stream()
    .collect(Collectors.toConcurrentMap(
        entry -> entry.getKey(),
        entry -> getAsync(entry.getValue(), route -> myHTTPClient.getRoute(route)))
    ));

    //retrieve results
    ConcurrentMap<String, RouteCost> result = futures.entrySet().stream()
    .collect(Collectors.toConcurrentMap(
        entry -> entry.getKey(),
        entry -> entry.getValue().join()
    ));

    RouteCostResponse response = new RouteCostResponse(result);

    return response;
  }
}
...