Java: Producer = Consumer, как узнать, когда остановиться? - PullRequest
6 голосов
/ 03 апреля 2012

У меня есть несколько рабочих, которые используют ArrayBlockingQueue.

Каждый работник берет один объект из очереди, обрабатывает его, и в результате может получить несколько объектов, которые будут помещены в очередь для дальнейшей обработки.Итак, работник = производитель + потребитель.

работник:

public class Worker implements Runnable
{
    private BlockingQueue<String> processQueue = null;

    public Worker(BlockingQueue<String> processQueue)
    {
        this.processQueue = processQueue;
    }

    public void run()
    {
        try
        {
            do
            {
                String item = this.processQueue.take();
                ArrayList<String> resultItems = this.processItem(item);

                for(String resultItem : resultItems)
                {
                    this.processQueue.put(resultItem);
                }
            }
            while(true);
        }
        catch(Exception)
        {
            ...
        }
    }

    private ArrayList<String> processItem(String item) throws Exception
    {
        ...
    }
}

Основной:

public class Test
{
    public static void main(String[] args) throws Exception
    {
        new Test().run();
    }

    private void run() throws Exception
    {
        BlockingQueue<String> processQueue = new ArrayBlockingQueue<>(10000);
        processQueue.put("lalala");

        Executor service = Executors.newFixedThreadPool(100);
        for(int i=0; i<100; ++i)
        {
            service.execute(new Worker(processQueue));
        }
    }
}

Каков наилучший способ остановить работников, когда больше нет работы?

Во-первых, я имею в виду, периодически проверять, сколько элементов в очереди и сколько элементов в настоящее время находятся впроцесс.Если оба значения равны нулю, выполните что-то вроде shutdownNow () в ExecutorService.Но я не уверен, что это лучший способ.

Ответы [ 3 ]

2 голосов
/ 03 апреля 2012

Если больше нечем заняться, поместите в очередь сообщение, в котором говорится об этом, и пусть рабочие отключатся по собственному усмотрению. Это хороший способ предотвратить повреждение данных.

Если вам нужно уведомить другую ветку о том, что все работники ушли домой, вы можете использовать CountDownLatch для этого.

1 голос
/ 03 апреля 2012

Похоже, у вас есть решение - используйте отдельную очередь в процессе выполнения, размер которой будет равен количеству обрабатываемых в данный момент элементов. Если вы используете соглашение о том, что доступ к любой очереди находится в блоках synchronized(theArrayBlockingQueue), тогда все должно быть хорошо. В частности, при перемещении элемента в состояние обработки удалите его из ArrayBlockingQueue и добавьте его вQQueue в пределах того же синхронизированного блока.

0 голосов
/ 04 апреля 2012

Я немного исправил ваш код, не уверен, что вы ожидаете этого, но, по крайней мере, он заканчивается!Если вы используете shutdownNow вместо shutdown, ваши рабочие будут прерваны и, если вы не вернете их к работе, выйдут без гарантии, что очередь пуста.

public class Test {

    public static void main(String[] args) throws Exception {
        new Test().run();
    }

    private void run() throws Exception {
        BlockingQueue<String> processQueue = new ArrayBlockingQueue<>(10000);
        processQueue.put("lalalalalalalalalalalalala"); //a little longer to make sure there is enough to process

        ExecutorService service = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 100; ++i) {
            service.execute(new Worker(processQueue));
        }
        service.shutdown(); //orderly shutdown = lets the tasks terminate what they are doing
        service.awaitTermination(1, TimeUnit.SECONDS); //blocks until all tasks have finished or throws TimeOutException if timeout is reached
    }

    public static class Worker implements Runnable {

        private BlockingQueue<String> processQueue = null;
        private int count = 0;

        public Worker(BlockingQueue<String> processQueue) {
            this.processQueue = processQueue;
        }

        @Override
        public void run() {
            try {
                do {
                    //tries to get something from the queue for 100ms and returns null if it could not get anything
                    String item = this.processQueue.poll(100, TimeUnit.MILLISECONDS);
                    if (item == null) break; //Ends the job because the queue was empty
                    count++;
                    List<String> resultItems = this.processItem(item);

                    for (String resultItem : resultItems) {
                        this.processQueue.put(resultItem);
                    }
                } while (true);
            } catch (InterruptedException e) {
                System.out.println("Interrupted");
                Thread.currentThread().interrupt();
            }
            if (count != 0) System.out.println(Thread.currentThread() + ": processed " + count + " entries");
        }

        private List<String> processItem(String item) { //let's put the string back less final character
            if (item.isEmpty()) {
                return Collections.<String> emptyList();
            } else {
                return Arrays.asList(item.substring(0, item.length() - 1));
            }
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...