Производительность пула многопоточности executorService - PullRequest
0 голосов
/ 07 января 2019

Я использую библиотеку параллелизма Java ExecutorService для запуска своих задач. Порог для записи в базу данных составляет 200 QPS, однако, эта программа может достичь только 20 QPS с 15 потоками. Я пробовал 5, 10, 20, 30 потоков, и они были даже медленнее, чем 15 потоков. Вот код:

ExecutorService executor = Executors.newFixedThreadPool(15);
List<Callable<Object>> todos = new ArrayList<>();

for (final int id : ids) {
    todos.add(Executors.callable(() -> {
        try {
            TestObject test = testServiceClient.callRemoteService();
    SaveToDatabase();
        } catch (Exception ex) {}
    }));
}
try {
    executor.invokeAll(todos);
} catch (InterruptedException ex) {} 
executor.shutdown();

1) Я проверил загрузку ЦП сервера Linux, на котором запущена эта программа, и составил 90% и 60% (у него 4 ЦП). Использование памяти составило всего 20%. Таким образом, процессор и память все еще были в порядке. Загрузка ЦП сервера базы данных была низкой (около 20%). Что может помешать скорости достичь 200 QPS? Может быть, эта служба вызова: testServiceClient.callRemoteService()? Я проверил конфигурацию сервера для этого вызова, и он позволяет большое количество вызовов в секунду.

2) Если число идентификаторов в идентификаторах превышает 50000, рекомендуется ли использовать invokeAll? Должны ли мы разделить его на более мелкие партии, такие как 5000 каждой партии?

Ответы [ 4 ]

0 голосов
/ 07 января 2019

Почему вы ограничиваете себя таким небольшим количеством потоков?

Таким образом, вы упускаете возможности для повышения производительности. Кажется, что ваши задачи действительно не ограничены процессором. Сетевые операции (удаленный сервис + запрос к базе данных) могут занимать большую часть времени для завершения каждой задачи. В это время, когда одна задача / поток должен ждать какого-то события (сеть, ...), другой поток может использовать ЦП. Чем больше потоков вы делаете доступными для системы, тем больше потоков может ожидать завершения сетевого ввода-вывода, в то время как некоторые потоки одновременно используют ЦП.

Я предлагаю вам резко увеличить количество потоков для исполнителя. Поскольку вы говорите, что оба удаленных сервера используются недостаточно, я предполагаю, что хост, на котором работает ваша программа, является узким местом на данный момент. Попробуйте увеличить (удвоить?) Количество потоков, пока узкое место не станет равным загруженности вашего процессора до 100%, или память или удаленная сторона.

Кстати, вы shutdown исполнитель, но вы на самом деле ждете завершения задач? Как вы измеряете «QPS»?

Мне приходит в голову еще одна вещь: как обрабатываются соединения с БД? То есть как синхронизируются SaveToDatabase() с? Все потоки разделяют (и конкурируют) одно соединение? Или, что еще хуже, каждый поток создаст новое соединение с БД, сделает свое дело, а затем снова закроет соединение? Это может быть серьезным узким местом, поскольку установление TCP-соединения и выполнение аутентификации может потребовать столько же времени, сколько и выполнение простого оператора SQL.

Если число идентификаторов в идентификаторах превышает 50000, целесообразно ли использовать invokeAll? Должны ли мы разделить его на более мелкие партии, такие как 5000 каждая партия?

Как уже писал @Vaclav Stengl, у Исполнителей есть внутренние очереди, в которые они ставят в очередь и из которых они обрабатывают задачи. Так что не нужно беспокоиться об этом. Вы также можете просто вызвать submit для каждой отдельной задачи, как только вы ее создали. Это позволяет уже начать выполнение первых задач, пока вы еще создаете / готовите более поздние задачи, что имеет смысл, особенно когда каждая задача создание занимает сравнительно много времени, но не повредит во всех других случаях. Думайте о invokeAll как об удобном методе для случаев, когда у вас уже есть набор задач. Если вы создаете задачи последовательно самостоятельно и у вас уже есть доступ к ExecutorService для их выполнения, просто submit() их a.s.a.p.

0 голосов
/ 07 января 2019

В этом коде нет ничего, что мешало бы такой частоте запросов, кроме повторного создания и уничтожения пула потоков, что очень дорого. Я предлагаю использовать Streams API, который не только проще, но и использует встроенный пул потоков

int[] ids = ....
IntStream.of(ids).parallel()
                 .forEach(id -> testServiceClient.callRemoteService(id));

Вот тест, использующий тривиальный сервис. Основные накладные расходы - это задержка при создании соединения.

public static void main(String[] args) throws IOException {
    ServerSocket ss = new ServerSocket(0);
    Thread service = new Thread(() -> {
        try {
            for (; ; ) {
                try (Socket s = ss.accept()) {
                    s.getOutputStream().write(s.getInputStream().read());
                }
            }
        } catch (Throwable t) {
            t.printStackTrace();
        }
    });
    service.setDaemon(true);
    service.start();

    for (int t = 0; t < 5; t++) {
        long start = System.nanoTime();
        int[] ids = new int[5000];
        IntStream.of(ids).parallel().forEach(id -> {
            try {
                Socket s = new Socket("localhost", ss.getLocalPort());
                s.getOutputStream().write(id);
                s.getInputStream().read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        long time = System.nanoTime() - start;
        System.out.println("Throughput " + (int) (ids.length * 1e9 / time) + " connects/sec");
    }
}

печать

Throughput 12491 connects/sec
Throughput 13138 connects/sec
Throughput 15148 connects/sec
Throughput 14602 connects/sec
Throughput 15807 connects/sec

Использование ExecutorService было бы лучше, как упоминает @ grzegorz-piwowarek.

    ExecutorService es = Executors.newFixedThreadPool(8);
    for (int t = 0; t < 5; t++) {
        long start = System.nanoTime();
        int[] ids = new int[5000];
        List<Future> futures = new ArrayList<>(ids.length);
        for (int id : ids) {
            futures.add(es.submit(() -> {
                try {
                    Socket s = new Socket("localhost", ss.getLocalPort());
                    s.getOutputStream().write(id);
                    s.getInputStream().read();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }));
        }
        for (Future future : futures) {
            future.get();
        }
        long time = System.nanoTime() - start;
        System.out.println("Throughput " + (int) (ids.length * 1e9 / time) + " connects/sec");
    }
    es.shutdown();

В этом случае дает почти одинаковые результаты.

0 голосов
/ 07 января 2019

Возможно, проблема QPS заключается в ограничении пропускной способности или выполнении транзакции (это заблокирует таблицу или строку). Так что просто увеличить размер пула не получалось. Дополнительно можно попробовать использовать шаблон производитель-потребитель.

0 голосов
/ 07 января 2019

О пакетном разделении: ExecutorService имеет внутреннюю очередь для хранения задач. В вашем случае ExecutorService executor = Executors.newFixedThreadPool(15); имеет 15 потоков, поэтому максимум 15 задач будут выполняться одновременно, а другие будут храниться в очереди. Размер очереди может быть параметризован. По умолчанию размер будет увеличиваться до макс. Вызов InvokeAll внутри метода execute, и этот метод помещает задачи в очередь, когда все потоки работают.

Имхо, есть 2 возможных сценария, почему процессор не на 100%:

  1. попробуйте увеличить пул потоков
  2. поток ожидает testServiceClient.callRemoteService() до завершено, и тем временем процессор работает starwing
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...