Producer-Consumer с ExecutorService.newFixedThreadPool - Сколько потоков создано? - PullRequest
2 голосов
/ 19 августа 2011
public class MainClass {

private static final int producerPoolSize = 10;
private static final int consumerPoolSize = 20;    

private ExecutorService prodExec = Executors.newFixedThreadPool(producerPoolSize);
private ExecutorService consExec = Executors.newFixedThreadPool(consumerPoolSize);

//main method here, which calls start() below

private void start(String[] args) {

    // Get list of ids, split them in to n(producerPoolSize) chunks

    for (int index = 0; index < producerPoolSize; index++) {
        Runnable producer = new Producer(consExec, chunkOfIdsForThisProducer);
        prodExec.execute(producer);
    }

}

public class Producer implements Runnable {

    private ExecutorService consExec;
    private List<Long> list;

    public Producer(ExecutorService exec, List<Long> list) {
        this.consExec = exec;
        this.list = list;
    }

    public void run() {

       for (Long id: list) {
          data = get data from db for the id
          consExec.execute(new Consumer(data));
       }
    }
 }

 public class Consumer implements Runnable {

     public void run() {
        // call web service
     }
 }

В приведенном выше коде у меня есть два пула потоков - по одному для производителей и потребителей.Я получаю несколько идентификаторов из базы данных, разделяю их на равные куски, чтобы они передавались потокам Producer для обработки.Поток производителя получает список идентификаторов и обрабатывает каждый из них последовательно, извлекая данные для каждого из идентификаторов и отправляя эти данные в поток потребителя для обработки.Теперь мой вопрос таков:

Я создаю 10 веток продюсера выше.И я хочу, чтобы размер пула потоков потребителя был равен 20. Но при обработке каждого идентификатора источник создает новый объект Runnable (потребитель) и передает (выполняет) его в службу исполнителя приемника.Насколько я понимаю, ExecutorService заключается в том, что исполняемый файл, который вы ему отправляете, помещается в рабочий поток и затем выполняется.Итак, в приведенном выше коде, если число идентификаторов, которые получает каждый производитель, равно 50, я действительно создаю 50 * 10 = 500 потоков потребителей?Это слишком много?

Или размер пула фактически означает количество рабочих потоков?Итак, в приведенном выше коде я создаю 500 задач на Consumer executor, которые фактически будут поставлены в очередь и выполнены 20 рабочими потоками?Возможно, я не объясняю это правильно, но немного запутался здесь из-за внутренней реализации исполнителя и беспокоюсь, если я создаю слишком много потоков потребителя.

Если это не способ реализовать это, может кто-то предложитьлучший подход?Спасибо.

Ответы [ 2 ]

3 голосов
/ 19 августа 2011

Действительно ли размер пула означает количество рабочих потоков?Да.

Если процесс Runnable потребителя занимает много времени, одновременно будет выполняться только 20.Остальные будут ждать в коллекции, пока не станет доступен поток для его запуска.

А если есть лучший способ сделать это.Есть ли причина, по которой вам нужно использовать темы?Если у вас нет 20 доступных процессоров, работающих параллельно, это может не увеличить ваше время обработки, поскольку все потоки будут тратить время на переключение контекста и т. Д., Которые бесполезны для обработки данных.

Кроме того, производителиполучать все данные и хранить их в потребителях.Если потребители не могут работать, потому что у вас их 500, и только 20 могут работать одновременно, тогда вы сохраняете (500 минус 20) * данные, которые вы можете обрабатывать.У вас могут быть потребители, выбирающие свои собственные данные.

В ответ на комментарий:

вместо

for (int index = 0; index < producerPoolSize; index++) {
    Runnable producer = new Producer(consExec, chunkOfIdsForThisProducer);
    prodExec.execute(producer);
}

и Процессор

for (Long id: list) {
    data = get data from db for the id
    consExec.execute(new Consumer(data));
}

Consumer выглядит следующим образом:

public class Consumer implements Runnable {

     long myId;

     Consumer(long id){
       myId = id;
     }

     public void run() {
        data = get data from db for the id
        // do whatever a consumer does with data
     }
 }

и

private void start(String[] args) {

    // Get list of ids create a new consumer for each id

    for (int index = 0; index < everyID.length; index++) {
        consExec.execute(new Consumer(everyID[i]));
    }

}

Тогда вы теряете целый класс, и пул 20 имеет больше смысла, поскольку потребители, которые заблокированы при получении данных ввода-выводабудут ждать, и те, кто готов, могут продолжить обработку.

3 голосов
/ 19 августа 2011

Размер пула определяет количество рабочих потоков.Если вы попытаетесь отправить элемент, когда все рабочие потоки заняты, он будет помещен в очередь ExecutorService и будет запущен, как только рабочий освободится.

В javadocs произнесите следующее:

Создает пул потоков, который повторно использует фиксированный набор потоков , работающих с общей неограниченной очередью .Если какой-либо поток завершается из-за сбоя во время выполнения до выключения, новый будет занимать его место, если необходимо выполнить последующие задачи.

Обратите внимание на подсвеченные части.Количество потоков фиксированное , а очередь неограниченная , что означает, что элементы, отправленные, когда потоки заняты, всегда будут помещаться в очередь, а не отклоняться.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...