ConcurrentLinkedQueue с wait () и notify () - PullRequest
5 голосов
/ 11 января 2012

Я не очень разбираюсь в многопоточности. Я пытаюсь сделать снимок экрана неоднократно одним потоком производителя, который добавляет объект BufferedImage к ConcurrentLinkedQueue, и Поток потребителя будет poll очередь для объекта BufferedImage для сохранения их в файл. Я мог бы потреблять их путем повторного опроса (пока цикл), но я не знаю, как потреблять их, используя notify() и wait(). Я пытался использовать wait() и notify в небольших программах, но не смог реализовать это здесь.

У меня есть следующий код:

class StartPeriodicTask implements Runnable {
    public synchronized void run() {
        Robot robot = null;
        try {
            robot = new Robot();
        } catch (AWTException e1) {
            e1.printStackTrace();
        }
        Rectangle screenRect = new Rectangle(Toolkit.getDefaultToolkit()
                .getScreenSize());
        BufferedImage image = robot.createScreenCapture(screenRect);
        if(null!=queue.peek()){
            try {
                System.out.println("Empty queue, so waiting....");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }else{
            queue.add(image);
            notify();
        }
    }
}

public class ImageConsumer implements Runnable {
        @Override
        public synchronized void run() {
            while (true) {
                BufferedImage bufferedImage = null;
                if(null==queue.peek()){
                    try {
                        //Empty queue, so waiting....
                        wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }else{
                    bufferedImage = queue.poll();
                    notify();
                }
                File imageFile = getFile();
                if (!imageFile.getParentFile().exists()) {
                    imageFile.getParentFile().mkdirs();
                }
                    try {
                        ImageIO.write(bufferedImage, extension, imageFile);
                        //Image saved
                    catch (IOException e) {
                        tracer.severe("IOException occurred. Image is not saved to file!");
                    }
                }
            }

Ранее у меня был повторный опрос для проверки существования BufferedImage объекта. Теперь я изменил метод run на synchronised и попытался реализовать wait() и notify(). Я правильно делаю? Пожалуйста помоги. Спасибо.

Ответы [ 3 ]

5 голосов
/ 12 января 2012

Первая проблема - это ненужное ожидание у вашего производителя:

    if(null!=queue.peek()){ // You are the producer, you don't care if the queue is empty
        try {
            System.out.println("Empty queue, so waiting....");
            wait(); // This puts you to bed, your waiting and so is your consumer
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }else{
        queue.add(image);
        notify();
    }

Это все, что вам нужно:

        queue.add(image);
        notify();

Следующая проблема - ненужная notify у вашего потребителя.Он дает контроль над своей обработкой в ​​тот момент, который, как я полагаю, вы намеревались использовать для продвижения своего продюсера, но, конечно, ваш код никогда не достигнет этой точки.Итак, это:

            }else{
                bufferedImage = queue.poll();
                notify();
            }
            File imageFile = getFile();
            if (!imageFile.getParentFile().exists()) {
                imageFile.getParentFile().mkdirs();
            }
                try {
                    ImageIO.write(bufferedImage, extension, imageFile);
                    //Image saved
                catch (IOException e) {
                    tracer.severe("IOException occurred. Image is not saved to file!");
                }
            }

Должно выглядеть так:

            }else{
                bufferedImage = queue.poll();

                File imageFile = getFile();
                if (!imageFile.getParentFile().exists()) {
                   imageFile.getParentFile().mkdirs();
                }

                try {
                    ImageIO.write(bufferedImage, extension, imageFile);
                    //Image saved
                catch (IOException e) {
                    tracer.severe("IOException occurred. Image is not saved to file!");
                }
            }
5 голосов
/ 12 января 2012

Вы используете неправильный Queue для работы. ConcurrentLinkedQueue - это неблокирующая очередь, что означает отсутствие семантики потребителя производителя. Если вы просто работаете с одним читателем и одним писателем, взгляните на SynchronousQueue

Проще говоря, ваш код можно переписать как таковой

BlockingQueue<?> queue = new SynchrnousQueue<?>();
class StartPeriodicTask implements Runnable {
    public void run() {
        Robot robot = null;
        try {
            robot = new Robot();
        } catch (AWTException e1) {
            e1.printStackTrace();
        }
        Rectangle screenRect = new Rectangle(Toolkit.getDefaultToolkit()
                .getScreenSize());
        BufferedImage image = robot.createScreenCapture(screenRect);
        queue.offer(image); //1
}
public class ImageConsumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                BufferedImage bufferedImage = queue.poll(); //2

                File imageFile = getFile();
                if (!imageFile.getParentFile().exists()) {
                    imageFile.getParentFile().mkdirs();
                }
                    try {
                        ImageIO.write(bufferedImage, extension, imageFile);
                        //Image saved
                    catch (IOException e) {
                        tracer.severe("IOException occurred. Image is not saved to file!");
                    }
            }

Это действительно так.

Позвольте мне объяснить. В строке // 1 производящий поток «поместит» изображение в очередь. Я цитирую место, потому что SynchrnousQueue не имеет глубины. На самом деле поток сообщает очереди: «Если есть потоки, запрашивающие элемент из этой очереди, передайте ему этот поток и позвольте мне продолжить. Если нет, я подожду, пока другой поток не будет готов» *

Строка // 2 похожа на 1, где потребляющий поток просто ждет, пока поток не предложит. Это прекрасно работает с одним читателем одного писателя

4 голосов
/ 12 января 2012

Как только библиотека java.util.concurrent вошла в JDK1.5, необходимость написать свою собственную логику ожидания / уведомления пришла прямо к двери.В 2012 году, если вы делаете свое собственное ожидание / уведомление, вы слишком усердно работаете и должны строго учитывать проверенные и истинные эквиваленты java.util.concurrent.встроенный java.util.concurrent.ConcurrentLinkedQueue.Другими словами, потребители сидят в своих собственных элементах Thread и .poll () из ConcurrentLinkedQue, пока оно равно !isEmpty().Большинство реализаций, которые я видел, бросают какой-то сон на одну секунду между тестами !isEmpty(), но я не думаю, что это действительно необходимо.Также обратите внимание на комментарий парня из Vint на мой ответ: .poll() может вернуть null.Рассмотрим альтернативные реализации java.util.AbstractQueue, которые могут иметь поведение блокировки ближе к тому, что вы ищете.

У этого парня простой пример: http://www.informit.com/articles/article.aspx?p=1339471&seqNum=4

Наконец, получите книгу Гетца "JavaПараллелизм на практике »и читайте его.Я почти уверен, что у него есть рецепт того, что можно использовать для замены собственного собственного ожидания / уведомлений.

...