Как отправить Списокв ThreadPoolExecutor, и каждый поток выберет один LinkedBlockingQueue и выполнит его параллельно - PullRequest
0 голосов
/ 03 апреля 2019

Я отправляю список LinkedBlockingQueue типа Long в ThreadPoolExecutor, и условие должно быть, так как каждый поток выбирает LinkedBlockingQueue long и выполняется параллельно

Это моя методическая логика

public void doParallelProcess() {

    List<LinkedBlockingQueue<Long>> linkedBlockingQueueList = splitListtoBlockingQueues();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(1, linkedBlockingQueueList.size(), 0L,
            TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory());
    Long initial = System.currentTimeMillis();
    try {

        System.out.println("linkedBlockingQueueList begin size is " + linkedBlockingQueueList.size() + "is empty"
                + linkedBlockingQueueList.isEmpty());

        while (true) {
            linkedBlockingQueueList.parallelStream().parallel().filter(q -> !q.isEmpty()).forEach(queue -> {
                Long id = queue.poll();
                MyTestRunnable runnab = new MyTestRunnable(id);
                executor.execute(runnab);
                System.out.println("Task Count: " + executor.getTaskCount() + ", Completed Task Count: "
                        + executor.getCompletedTaskCount() + ", Active Task Count: " + executor.getActiveCount());
            });

            System.out.println("linkedBlockingQueueList end size is " + linkedBlockingQueueList.size() + "is empty"
                    + linkedBlockingQueueList.isEmpty());

            System.out.println("executor service " + executor);

            if (executor.getCompletedTaskCount() == (long) mainList.size()) {
                break;
            }

            while (executor.getActiveCount() != 0) {
                System.out.println("Task Count: " + executor.getTaskCount() + ", Completed Task Count: "
                        + executor.getCompletedTaskCount() + ", Active Task Count: " + executor.getActiveCount());
                Thread.sleep(1000L);
            }

        }
    } catch (Exception e) {
    } finally {
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
    }
} `

Как отправить список LinkedBlockingQueue отдельной теме пример:

  1. List<LinkedBlockingQueue<Long>> каждый LinkedBlockingQueue содержит 50 данных очереди
  2. размер List<LinkedBlockingQueue<Long>> 50
  3. каждый поток должен выбрать одну LinkedBlockingQueue<Long> и выполнить 50 очередей задачи.

Ответы [ 2 ]

1 голос
/ 04 апреля 2019

Вход для ExecutorService - либо Runnable, либо Callable. Любая задача, которую вы отправляете, должна реализовывать один из этих двух интерфейсов. Если вы хотите отправить группу задач в пул потоков и подождать, пока они не будут завершены, вы можете использовать метод invokeAll и выполнить цикл по полученным Future s, вызывая get для каждого : см. этот информативный ответ на аналогичный вопрос.

Вам не нужно группировать входные задачи в группы. Вы никогда не хотите, чтобы у службы executor были незанятые потоки, пока еще есть над чем работать! Вы хотите, чтобы он мог захватить следующую задачу, как только освободятся ресурсы, и группирование таким способом противоречит этому. Ваш код делает это:

while non-empty input lists exist {
    for each non-empty input list L {
        t = new Runnable(L.pop())
        executor.submit(t)
    }
    while (executor.hasTasks()) {
        wait
    }
}

Как только одна из этих задач завершится, этот поток сможет свободно переходить к другой работе. Но это не так, потому что вы ждете, пока все N задач завершатся, прежде чем отправлять больше. Отправьте их все сразу с помощью invokeAll и позвольте службе исполнителя выполнить то, для чего она была создана.

0 голосов
/ 03 апреля 2019

Класс Executors является вашей основной записью в пулах потоков:

    ExecutorService executor = Executors.newCachedThreadPool();
    linkedBlockingQueueList.forEach(queue -> executor.submit(() -> { /* process queue */ }));

Если вы хотите создать ThreadPoolExecutor самостоятельно - это дает вам больший контроль над конфигурацией - есть как минимум два способа указать фабрику потоков по умолчанию:

  1. Оставьте аргумент фабрики потоков:

        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, linkedBlockingQueueList.size(),
                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    
  2. Используйте класс Executors еще раз для получения фабрики потоков по умолчанию:

        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, linkedBlockingQueueList.size(),
                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(),
                Executors.defaultThreadFactory());
    
...