Невозможно создать поток для обработки сокета (Block IO) с помощью пула потоков - PullRequest
0 голосов
/ 05 ноября 2018

Я хочу создать новую thead и назначить ее исполнителю пула потоков, чтобы он мог обрабатываться одновременно. Размер ядра такого пула потоков равен 5, а максимальный - 20. Однако в приведенном ниже коде он не может создать больше потоков, чем размер ядра. Если я раскомментировал код if (socket != null) { socket.close(); } (на самом деле, я не могу закрыть его здесь, так как сокет должен обрабатываться в TestHandler), можно создать больше потоков вплоть до максимума пула потоков, равного 20. Так почему это может случиться? Кто-нибудь может помочь объяснить это? Размер workQueue можно настроить соответствующим образом, и я использовал apache ab, чтобы сделать тест. например ab -n 1000 -c 10 http://localhost:8080/

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;
import java.util.concurrent.*;

public class MultiThreadServer {
    public static void main(String[] args) throws IOException {
        MultiThreadServer server = new MultiThreadServer();
        server.start();
    }

    private static ThreadFactory builder = new ThreadFactoryBuilder().setNameFormat("Demo Task Executor #%d").build();

    public void start() throws IOException {
        ServerSocket serverSocket = new ServerSocket(8080);
        Socket socket = null;
        ExecutorService executor = new ThreadPoolExecutor(5,
                20,
                10,
                TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(10),
                builder);
        for (; ; ) {
            socket = serverSocket.accept();

            executor.execute(new TestHandler(socket));
//            if (socket != null) {
//                socket.close();
//            }
        }
    }

    class TestHandler implements Runnable {
        private Socket socket;

        public TestHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(new Random().nextInt(100));
                socket.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread());
        }
    }

}

1 Ответ

0 голосов
/ 05 ноября 2018

Если вы проверите реализацию метода execute в классе ExecutorService, вы увидите следующий код:

//...................

        if (isRunning(c) && workQueue.offer(command)) {
//...................
        }
        else if (!addWorker(command, false))
            reject(command);

//...................

Таким образом, новый поток будет создан, только если workQueue заполнен. Вы инициализировали свой пул с new ArrayBlockingQueue<>(10). Это означает, что новые потоки не будут созданы для первых 15 одновременных запросов (corePoolSize + емкость очереди).

Чтобы увидеть, например, 6 потоков, просто попробуйте запустить ab -n 1000 -c 16 http://localhost:8080/ или уменьшить начальную емкость очереди. Но учтите, что когда очередь заполнена и число потоков равно maximumPullSize, любой новый запрос будет отклонен.

...