Producer Consumer - Использование Executors.newFixedThreadPool - PullRequest
7 голосов
/ 11 августа 2011

Мое понимание шаблона «производитель-потребитель» состоит в том, что он может быть реализован с использованием очереди, совместно используемой производителем и потребителем.Производитель отправляет работу в общую очередь, потребитель получает ее и обрабатывает.Он также может быть реализован производителем, напрямую отправляющим его потребителю (потоки производителя, напрямую отправляющему в службу исполнителя Consumer).

Теперь я смотрю на класс Executors, который предоставляет некоторые распространенные реализации пулов потоков.Метод newFixedThreadPool, согласно спецификации, «повторно использует фиксированное количество потоков, работающих в общей неограниченной очереди».О какой очереди они здесь говорят?

Если источник напрямую отправляет задачу потребителю, это внутренняя очередь ExecutorService, содержащая список Runnables?

Или это промежуточная очередь, если производитель отправляетв общую очередь?

Может быть, я упускаю весь смысл, но кто-нибудь, пожалуйста, уточните?

Ответы [ 3 ]

4 голосов
/ 12 августа 2011

Вы правы, ExecutorService - это не только пул потоков, но и полная реализация для потребителя. Эта внутренняя очередь фактически является поточно-ориентированной очередью Runnable с (точнее FutureTask), содержащей задачи, которые вы submit().

Все потоки в пуле заблокированы в этой очереди, ожидая выполнения задач. Когда вы submit() выполняете задачу, ровно один поток ее подхватит и запустит. Конечно, submit() не ждет, пока поток в пуле завершит обработку.

С другой стороны, если вы отправляете огромное количество задач (или долго выполняющихся), вы можете получить все занятые потоки в пуле и некоторые задачи, ожидающие в очереди. Как только любой поток выполнит свою задачу, он сразу выберет первый из очереди.

1 голос
/ 12 июня 2012
public class Producer extends Thread {  
    static List<String> list = new ArrayList<String>();  

    public static void main(String[] args) {  
        ScheduledExecutorService executor = Executors  
                .newScheduledThreadPool(12);  
        int initialDelay = 5;  
        int pollingFrequency = 5;  
        Producer producer = new Producer();  
        @SuppressWarnings({ "rawtypes", "unused" })  
        ScheduledFuture schedFutureProducer = executor.scheduleWithFixedDelay(  
                producer, initialDelay, pollingFrequency, TimeUnit.SECONDS);  
        for (int i = 0; i < 3; i++) {  
            Consumer consumer = new Consumer();  
            @SuppressWarnings({ "rawtypes", "unused" })  
            ScheduledFuture schedFutureConsumer = executor  
                    .scheduleWithFixedDelay(consumer, initialDelay,  
                            pollingFrequency, TimeUnit.SECONDS);  
        }  

    }  

    @Override  
    public void run() {  
        list.add("object added to list is " + System.currentTimeMillis());  
                              ///adding in list become slow also because of synchronized behavior  
    }  
}  

class Consumer extends Thread {  

    @Override  
    public void run() {  
        getObjectFromList();  
    }  

    private void getObjectFromList() {  
        synchronized (Producer.list) {  
            if (Producer.list.size() > 0) {  
                System.out.println("Object name removed by "  
                        + Thread.currentThread().getName() + "is "  
                        + Producer.list.get(0));  
                Producer.list.remove(Producer.list.get(0));  
            }  
        }  
    }  
}  
0 голосов
/ 12 августа 2011

Проверьте это:
Пример производителя-потребителя на Java (RabbitMQ) (он написан для другой библиотеки, но на Java и четко демонстрирует концепцию;)
Надеюсь, это поможет!

П.С.: На самом деле, есть несколько примеров, но вы понимаете;)

...