Параллельная и блокирующая очередь в Java - PullRequest
27 голосов
/ 31 июля 2009

У меня есть классическая проблема потока, отправляющего события во входящую очередь второго потока. Только на этот раз я очень заинтересован в производительности. Чего я хочу добиться:

  • Я хочу одновременный доступ к очереди, продюсер продвигается, получатель всплывает.
  • Когда очередь пуста, я хочу, чтобы потребитель заблокировал очередь, ожидая производителя.

Моей первой идеей было использовать LinkedBlockingQueue, но вскоре я понял, что это не одновременно, и производительность пострадала. С другой стороны, сейчас я использую ConcurrentLinkedQueue, но все же я плачу стоимость wait() / notify() на каждую публикацию. Поскольку потребитель, обнаружив пустую очередь, не блокируется, мне приходится синхронизироваться и wait() на блокировку. С другой стороны, производитель должен получить эту блокировку и notify() при каждой публикации. В итоге я плачу за sycnhronized (lock) {lock.notify()} в каждой публикации, даже если она не нужна.

То, что, я думаю, здесь необходимо, это очередь, которая является одновременно и блокирующей, и параллельной. Я представляю себе операцию push() для работы, как в ConcurrentLinkedQueue, с дополнительным notify() к объекту, когда толкаемый элемент является первым в списке. Такая проверка, я считаю, уже существует в ConcurrentLinkedQueue, так как нажатие требует соединения со следующим элементом. Таким образом, это будет намного быстрее, чем синхронизация каждый раз на внешней блокировке.

Что-то подобное доступно / разумно?

Ответы [ 6 ]

12 голосов
/ 31 июля 2009

Я думаю, что вы можете придерживаться java.util.concurrent.LinkedBlockingQueue независимо от ваших сомнений. Это одновременно. Хотя я понятия не имею о его производительности. Возможно, другая реализация BlockingQueue подойдет вам лучше. Их не так уж много, поэтому проводите тесты производительности и измеряйте.

6 голосов
/ 28 августа 2012

Аналогично этому ответу https://stackoverflow.com/a/1212515/1102730, но немного по-другому .. В итоге я использовал ExecutorService. Вы можете создать его с помощью Executors.newSingleThreadExecutor(). Мне нужна была параллельная очередь для чтения / записи BufferedImages в файлы, а также атомарность для чтения и записи. Мне нужен только один поток, потому что файловый ввод-вывод на несколько порядков быстрее, чем исходный, чистый ввод-вывод. Кроме того, меня больше беспокоит атомарность действий и правильность, чем производительность, но этот подход также может быть реализован с несколькими потоками в пуле, чтобы ускорить процесс.

Чтобы получить изображение (Try-Catch-Наконец опущено):

Future<BufferedImage> futureImage = executorService.submit(new Callable<BufferedImage>() {
    @Override
        public BufferedImage call() throws Exception {
            ImageInputStream is = new FileImageInputStream(file);
            return  ImageIO.read(is);
        }
    })

image = futureImage.get();

Чтобы сохранить изображение (Try-Catch-Наконец опущено):

Future<Boolean> futureWrite = executorService.submit(new Callable<Boolean>() {
    @Override
    public Boolean call() {
        FileOutputStream os = new FileOutputStream(file); 
        return ImageIO.write(image, getFileFormat(), os);  
    }
});

boolean wasWritten = futureWrite.get();

Важно отметить, что вы должны очищать и закрывать свои потоки в блоке finally. Я не знаю, как он работает по сравнению с другими решениями, но он довольно универсален.

5 голосов
/ 31 июля 2009

Я бы посоветовал вам взглянуть на ThreadPoolExecutor newSingleThreadExecutor. Он будет поддерживать сохранение ваших заданий, заказанных для вас, и если вы отправите Callables вашему исполнителю, вы также сможете получить искомое блокирующее поведение.

4 голосов
/ 01 августа 2009

Вы можете попробовать LinkedTransferQueue от jsr166: http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166y/

Он удовлетворяет вашим требованиям и имеет меньше накладных расходов на операции предложения / опроса. Как видно из кода, когда очередь не пуста, она использует атомарные операции для опроса элементов. И когда очередь пуста, она вращается в течение некоторого времени и припаркует поток в случае неудачи. Я думаю, что это может помочь в вашем случае.

3 голосов
/ 31 июля 2009

Я использую ArrayBlockingQueue всякий раз, когда мне нужно передать данные из одного потока в другой. Использование методов put и take (которые будут блокироваться, если они заполнены / пусты).

2 голосов
/ 31 июля 2009

Вот список классов, реализующих BlockingQueue.

Я бы порекомендовал проверить SynchronousQueue.

Как и @Rorick, упомянутый в его комментарии, я считаю, что все эти реализации являются параллельными. Я думаю, что ваши проблемы с LinkedBlockingQueue могут быть неуместны.

...