Почему количество используемых потоков больше, чем требуется? - PullRequest
2 голосов
/ 03 июня 2019

У меня есть приложение SpringBoot, в котором я разрешил не более 45 одновременных запросов. Теперь 1 запрос в своем пути вызывает 16 внешних служб параллельно с использованием threadPool A. Поэтому, имея в виду средний и худший случай, я сохранил следующие конфигурации:

ThreadPoolTaskExecutor A = new ThreadPoolTaskExecutor();
A.setCorePoolSize(400);
A.setMaxPoolSize(1000);
A.setQueueCapacity(10);
A.setThreadNamePrefix("async-executor");
A.initialize();

Я ожидал, что не более 45 * 16 = 720 потоков . Но при выполнении нагрузочного теста я заметил, что потоки продолжали открываться (проверено в дампе потоков), и через несколько минут он начал выдавать RejectedExecutionException.

RejectedExecutionException
Task ServiceX rejected from org.springframework.scheduling.concurrent.
ThreadPoolTaskExecutor$1@4221a19e[Running, pool
size = 1000, active threads = 2, queued tasks = 10, completed tasks = 625216]

Большинство потоков, как показано в дампе потоков

"executor-A-57" #579 prio=5 os_prio=0 tid=0x000000000193f800 nid=0x2e95 waiting on condition [0x00007fa9e820c000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x0000000582dadf90> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1073)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
    - None

Я хотел знать, чего мне здесь не хватает? Почему я получаю отказ?

Редактировать: Я попытался воспроизвести похожую вещь на коротком куске кода, вот так:

MainClass запускает длинный цикл. Внутри каждого цикла он вызывает сервис1 3 раза. На данный момент у меня есть демо-сервис, который просто имеет внутри тот же код Thread.sleep(100).

MainClass.java

package com.flappy.everything.threadpooling;

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class MainClass {

    private static ThreadPoolTaskExecutor getExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setMaxPoolSize(4);
        threadPoolTaskExecutor.setThreadNamePrefix("async-exec");
        threadPoolTaskExecutor.setCorePoolSize(4);
        threadPoolTaskExecutor.setQueueCapacity(2);
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolTaskExecutor outerExecutor = getExecutor();
        List<Service1> services = Arrays.asList(new Service1(), new Service1(), new Service1());
        for (int i = 0; i < 1000000; i++) {
            List<Future> futures = new ArrayList<>();
            for (Service1 service : services) {
                futures.add(outerExecutor.submit(() -> {
                    try {
                        service.set();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }));
            }
            for (Future future : futures) {
                future.get();
            }
        }
    }
}

Service1.java

package com.oyorooms.everything.threadpooling;

import org.springframework.scheduling.annotation.Async;

public class Service1 {
    public void set() throws InterruptedException {
        Thread.sleep(100);
        System.out.println(Thread.currentThread().getName());
    }
}

Таким образом, в идеале только 3 потока должны быть открыты для ThreadPool, который я дал, но, тем не менее, я получаю отклонение при запуске кода.

Ответы [ 2 ]

3 голосов
/ 04 июня 2019

Это был интересный.

Причина, по которой код, который вы перечислили, дает сбой, заключается в том, что время, необходимое для передачи элемента из рабочей очереди в рабочий поток, меньше, чем время, которое требуется основному потоку для помещения элементов в очередь.

Поток идет так:

if(there are active threads and is there availability on the queue){
    submit to the work queue for the worker threads to pick up // 1
} else {
   if(max pool size is not met){
      create a new thread with this task being its first task // 2
   } else { 
      reject // 3
   }
} 

То, что вы видите, это код, набравший // 3.

Когда вы впервые отправляете свои задачи, количество потоков будет меньше, чем максимальный размер пула. Первый раунд представленных заданий дойдет до // 2.

После первой итерации число активных потоков будет максимальным размером пула, и код будет пытаться отправить его на // 1.

Допустим, основной поток очень быстро помещает 3 элемента в очередь, так что 4 потока в ThreadPool не могут выполнить его достаточно быстро. Если это произойдет, мы передадим первый оператор if (поскольку в очереди нет места) и перейдем к другому. Поскольку максимальный размер пула уже достигнут, делать ничего не остается, кроме reject.

Это может быть дополнительно объяснено проверкой Javadocs ThreadPoolExecutor .

Если запрос не может быть поставлен в очередь, создается новый поток, если только он не превысит MaximumPoolSize, и в этом случае задача будет отклонена.

и позже

Прямые передачи обслуживания обычно требуют неограниченного MaximumPoolSizes, чтобы избежать отклонения новых представленных задач. Это, в свою очередь, допускает возможность неограниченного роста потока, когда команды продолжают поступать в среднем быстрее, чем они могут быть обработаны.

Чтобы решить вашу проблему, у вас есть два разумных варианта:

  1. Используйте SynchronousQueue . Поток, предлагающий SynchronousQueue, будет ждать бесконечно, пока другой поток не возьмет элемент, если он знает, что другой поток ожидает его получения. Заданный вами фиксированный размер очереди приведет к тому, что основной поток вернется (без блокировки), если установка не удалась (т. Е. Другой поток не сразу ее снял). Чтобы использовать SynchronousQueue с помощью Spring, установите емкость очереди равной нулю. setQueueCapacity(0). Также из Javadocs

    Хорошим выбором по умолчанию для рабочей очереди является SynchronousQueue, который передает задачи потокам, не удерживая их в противном случае.

  2. Установите размер очереди, который будет больше или равен числу одновременных задач, которые вы ожидаете отправить. Размер очереди, скорее всего, не достигнет этого размера в целом, но он защитит вас в будущем.

2 голосов
/ 03 июня 2019

Я бы порекомендовал проверить это, добавив 1 строку логгера, которая выводит исполнителя задачи, а затем подсчитывая 16 различных вызовов и 45 запросов.Там может быть несколько вещей, происходящих.

  1. Может быть, ThreadPoolTaskExecutor не является компонентом, и Spring вместо этого выбирает другой, настроенный в вашем приложении.
  2. Возможно, какая-то другая часть приложения также использует асинхронные вызовы

  3. Может быть некоторая ошибка в коде, который зацикливается навсегда

и т.д ...

Но хорошее место для начала, если у вас нет модульных тестов, - это просто записать, что происходит, и проанализировать ваши журналы.

...