Реализация BlockingQueue: чем отличаются SynchronousQueue от LinkedBlockingQueue - PullRequest
32 голосов
/ 24 февраля 2011

Я вижу эти реализации BlockingQueue и не могу понять различия между ними. Мой вывод пока:

  1. Мне никогда не понадобится SynchronousQueue
  2. LinkedBlockingQueue обеспечивает FIFO, BlockingQueue необходимо создать с параметром true, чтобы сделать его FIFO
  3. SynchronousQueue разбивает метод большинства коллекций (содержит, размер и т. Д.)

Так, когда мне когда-либо понадобится SynchronousQueue ? Является ли производительность этой реализации лучше, чем LinkedBlockingQueue ?

Чтобы сделать его более сложным ... почему Executors.newCachedThreadPool использует SynchronousQueue, когда другие ( Executors.newSingleThreadExecutor и Executors.newFixedThreadPool ) используют LinkedBloueQ

EDIT

Первый вопрос решен. Но я все еще не понимаю, почему Executors.newCachedThreadPool использует SynchronousQueue, когда другие ( Executors.newSingleThreadExecutor и Executors.newFixedThreadPool ) используют LinkedBlockingQueue?

То, что я получаю, с SynchronousQueue, производитель будет заблокирован, если нет свободного потока. Но поскольку количество потоков практически не ограничено (новые потоки будут создаваться при необходимости), этого никогда не произойдет. Так почему он должен использовать SynchronousQueue?

Ответы [ 3 ]

52 голосов
/ 24 февраля 2011

SynchronousQueue - это очень особый вид очереди - он реализует подход рандеву (производитель ждет, пока потребитель готов, потребитель ждет, пока производитель готов) за интерфейсом Queue.

Поэтому он может понадобиться только в особых случаях, когда вам нужна конкретная семантика, например, Однопоточность задачи без постановки в очередь дальнейших запросов .

Другой причиной использования SynchronousQueue является производительность. Реализация SynchronousQueue кажется сильно оптимизированной, поэтому если вам не нужно ничего, кроме точки встречи (как в случае Executors.newCachedThreadPool(), где потребители создаются «по требованию», так что элементы очереди не ' т), вы можете получить прирост производительности, используя SynchronousQueue.

Простой синтетический тест показывает, что в простом сценарии с одним производителем и одним потребителем на двухъядерной машине пропускная способность SynchronousQueue в ~ 20 раз выше, чем пропускная способность LinkedBlockingQueue и ArrayBlockingQueue с длиной очереди = 1. Когда очередь длина увеличивается, их пропускная способность увеличивается и почти достигает пропускной способности SynchronousQueue. Это означает, что SynchronousQueue имеет низкие издержки синхронизации на многоядерных машинах по сравнению с другими очередями. Но опять же, это имеет значение только в определенных обстоятельствах, когда вам нужна точка встречи, замаскированная под Queue.

EDIT:

Вот тест:

public class Test {
    static ExecutorService e = Executors.newFixedThreadPool(2);
    static int N = 1000000;

    public static void main(String[] args) throws Exception {    
        for (int i = 0; i < 10; i++) {
            int length = (i == 0) ? 1 : i * 5;
            System.out.print(length + "\t");
            System.out.print(doTest(new LinkedBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new ArrayBlockingQueue<Integer>(length), N) + "\t");
            System.out.print(doTest(new SynchronousQueue<Integer>(), N));
            System.out.println();
        }

        e.shutdown();
    }

    private static long doTest(final BlockingQueue<Integer> q, final int n) throws Exception {
        long t = System.nanoTime();

        e.submit(new Runnable() {
            public void run() {
                for (int i = 0; i < n; i++)
                    try { q.put(i); } catch (InterruptedException ex) {}
            }
        });    

        Long r = e.submit(new Callable<Long>() {
            public Long call() {
                long sum = 0;
                for (int i = 0; i < n; i++)
                    try { sum += q.take(); } catch (InterruptedException ex) {}
                return sum;
            }
        }).get();
        t = System.nanoTime() - t;

        return (long)(1000000000.0 * N / t); // Throughput, items/sec
    }
}    

А вот результат на моей машине:

enter image description here

5 голосов
/ 25 февраля 2011

В настоящее время по умолчанию Executors (на основе ThreadPoolExecutor) можно использовать набор предварительно созданных потоков фиксированного размера и BlockingQueue некоторого размера для любого переполнения или создать потоки до максимального размера, если (и только если) эта очередь заполнена.

Это приводит к некоторым удивительным свойствам. Например, поскольку дополнительные потоки создаются только после достижения емкости очереди, использование LinkedBlockingQueue (что не ограничено) означает, что новые потоки никогда не будут созданы, даже если текущий размер пула равен нулю. Если вы используете ArrayBlockingQueue, то новые потоки создаются только в том случае, если он заполнен, и существует разумная вероятность того, что последующие задания будут отклонены, если к тому времени пул еще не очистил пространство.

A SynchronousQueue имеет нулевую емкость, поэтому производитель блокируется до тех пор, пока потребитель не станет доступным или не будет создан поток. Это означает, что, несмотря на впечатляющие цифры @axtavt, пул кэшированных потоков обычно имеет худшую производительность с точки зрения производителя.

К сожалению, в настоящее время нет хорошей библиотечной версии компромиссной реализации, которая будет создавать потоки во время пакетов или действий до некоторого максимума от низкого минимума. У вас есть либо растущий бассейн, либо фиксированный. У нас он есть внутри, но он еще не готов для общественного потребления.

4 голосов
/ 24 февраля 2011

Пул потоков кеша создает потоки по требованию.Ему нужна очередь, которая либо передает задачу ожидающему потребителю, либо завершается с ошибкой.Если нет ожидающего потребителя, он создает новый поток.SynchronousQueue не содержит элемент, вместо этого он передает элемент или завершается ошибкой.

...