Java BlockingQueue высокая задержка в Linux - PullRequest
25 голосов
/ 03 января 2011

Я использую BlockingQueue: s (пытаюсь и ArrayBlockingQueue, и LinkedBlockingQueue) передавать объекты между различными потоками в приложении, над которым я сейчас работаю.Производительность и задержка относительно важны в этом приложении, поэтому мне было любопытно, сколько времени требуется для передачи объектов между двумя потоками с помощью BlockingQueue.Чтобы измерить это, я написал простую программу с двумя потоками (один потребитель и один производитель), где я позволил производителю передать временную метку (полученную с помощью System.nanoTime ()) потребителю, см. Код ниже.

Я помню, что читал где-то на каком-то форуме, что это заняло около 10 микросекунд для кого-то, кто пытался это сделать (не знаю, на какой ОС и оборудовании это было), поэтому я не был слишком удивлен, когда потребовалось ~ 30 микросекундя на моем Windows 7 (Intel E7500 Core 2 Duo CPU, 2,93 ГГц), в то время как многие другие приложения работают в фоновом режиме.Однако я был весьма удивлен, когда провел такой же тест на нашем гораздо более быстром Linux-сервере (два четырехъядерных процессора Intel X5677 3,46 ГГц и Debian 5 с ядром 2.6.26-2-amd64).Я ожидал, что задержка будет ниже, чем на моем окне Windows, но, наоборот, она была намного выше - ~ 75 - 100 микросекунд!Оба теста были выполнены с использованием Sun Hotspot JVM версии 1.6.0-23.

Кто-нибудь еще проводил подобные тесты с похожими результатами в Linux?Или кто-нибудь знает, почему в Linux он работает намного медленнее (с лучшим оборудованием), может ли быть так, что переключение потоков просто намного медленнее в Linux по сравнению с Windows?Если это так, кажется, что Windows на самом деле гораздо лучше подходит для каких-либо приложений.Любая помощь в понимании относительно высоких цифр очень ценится.

Редактировать:
После комментария от DaveC я также провел тест, в котором я ограничил JVM (наLinux машина) на одно ядро ​​(т.е. все потоки работают на одном ядре).Это резко изменило результаты - задержка упала до уровня ниже 20 микросекунд, то есть лучше, чем результаты на машине с Windows.Я также провел несколько тестов, в которых я ограничил поток производителя одним ядром, а поток потребителя - другим (пытаясь установить их на одном и том же сокете и на разных сокетах), но это, похоже, не помогло - задержка все еще составляла ~ 75микросекунд.Кстати, это тестовое приложение - почти все, что я запускаю на машине во время выполнения теста.

Кто-нибудь знает, имеют ли эти результаты смысл?Должно ли это быть намного медленнее, если производитель и потребитель работают на разных ядрах?Любой вклад действительно приветствуется.

Повторно отредактировано (6 января):
Я экспериментировал с различными изменениями в коде и рабочей среде:

  1. Я обновил ядро ​​Linux до 2.6.36.2 (с 2.6.26.2).После обновления ядра измеренное время изменилось до 60 микросекунд с очень небольшими изменениями, с 75-100 до обновления.Установка привязки ЦП к потокам производителя и потребителя не имела никакого эффекта, за исключением ограничения их одним и тем же ядром.При работе на том же ядре измеренная задержка составляла 13 микросекунд.

  2. В исходном коде я заставлял производителя переходить в спящий режим на 1 секунду между каждой итерацией, чтобы дать потребителю достаточно времени для вычисления истекшего времени и его вывода на консоль. Если я удаляю вызов Thread.sleep () и вместо этого позволяю как производителю, так и потребителю вызывать барьер.await () на каждой итерации (потребитель вызывает его после печати истекшего времени на консоли), измеренная задержка уменьшается с 60 микросекунд до менее 10 микросекунд. Если потоки выполняются на одном и том же ядре, задержка становится меньше 1 мкс. Кто-нибудь может объяснить, почему это так значительно уменьшило время ожидания? Моим первым предположением было то, что изменение вызвало то, что производитель вызвал queue.put () до того, как потребитель вызвал queue.take (), поэтому потребителю никогда не приходилось блокировать, но после игры с измененной версией ArrayBlockingQueue я обнаружил, что это предположение, чтобы быть ложным - потребитель фактически заблокировал. Если у вас есть другие предположения, пожалуйста, дайте мне знать. (Кстати, если я позволю производителю вызвать и Thread.sleep () и барьер.await (), задержка останется на уровне 60 микросекунд).

  3. Я также попробовал другой подход - вместо вызова queue.take () я вызвал queue.poll () с таймаутом в 100 микросекунд. Это уменьшило среднюю задержку до уровня ниже 10 микросекунд, но, конечно, намного более интенсивно использует процессор (но, вероятно, менее интенсивно использует процессор, чем занят?).

Повторно отредактировано (10 января) - проблема решена:
Ниндзяль предположил, что задержка ~ 60 микросекунд была вызвана тем, что процессору приходилось выходить из более глубоких состояний сна - и он был совершенно прав! После отключения C-состояний в BIOS задержка была уменьшена до <10 микросекунд. Это объясняет, почему у меня намного больше задержка в пункте 2 выше - когда я отправлял объекты чаще, процессор был достаточно занят, чтобы не переходить в более глубокие состояния сна. Большое спасибо всем, кто нашел время, чтобы прочитать мой вопрос и поделился своими мыслями здесь! </p>

...

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CyclicBarrier;

public class QueueTest {

    ArrayBlockingQueue<Long> queue = new ArrayBlockingQueue<Long>(10);
    Thread consumerThread;
    CyclicBarrier barrier = new CyclicBarrier(2);
    static final int RUNS = 500000;
    volatile int sleep = 1000;

    public void start() {
        consumerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    barrier.await();
                    for(int i = 0; i < RUNS; i++) {
                        consume();

                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
        });
        consumerThread.start();

        try {
            barrier.await();
        } catch (Exception e) { e.printStackTrace(); }

        for(int i = 0; i < RUNS; i++) {
            try {
                if(sleep > 0)
                    Thread.sleep(sleep);
                produce();

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public void produce() {
        try {
            queue.put(System.nanoTime());
        } catch (InterruptedException e) {
        }
    }

    public void consume() {
        try {
            long t = queue.take();
            long now = System.nanoTime();
            long time = (now - t) / 1000; // Divide by 1000 to get result in microseconds
            if(sleep > 0) {
                System.out.println("Time: " + time);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        QueueTest test = new QueueTest();
        System.out.println("Starting...");
        // Run first once, ignoring results
        test.sleep = 0;
        test.start();
        // Run again, printing the results
        System.out.println("Starting again...");
        test.sleep = 1000;
        test.start();
    }
}

Ответы [ 4 ]

6 голосов
/ 03 января 2011

Ваш тест не является хорошей мерой задержки передачи обслуживания очереди, потому что у вас есть один поток, считывающий очередь, который записывает синхронно в System.out (выполняет строку и длинную конкатенацию, пока она в нем), прежде чем он будет выполнен снова.Чтобы измерить это должным образом, вам нужно вывести это упражнение из этой цепочки и выполнить как можно меньше работы в цепочке.

Было бы лучше просто выполнить вычисления (тогда-сейчас) в получателе и добавить результат в какую-то другую коллекцию, которая периодически выводится другим потоком, который выводит результаты.Я склонен делать это, добавляя к должным образом подготовленной структуре массива на основе массива, доступ к которой осуществляется через AtomicReference (следовательно, поток отчетов просто должен получить getAndSet для этой ссылки с другим экземпляром этой структуры хранения, чтобы получить последний пакет результатов; например, make 2списки, установите один как активный, каждый поток xsa просыпается и меняет местами активные и пассивные).Затем вы можете сообщить о некотором распределении вместо каждого отдельного результата (например, децильный диапазон), что означает, что вы не генерируете огромные файлы журналов при каждом запуске и не получаете полезную информацию, напечатанную для вас.

FWIW Я согласен со временемПитер Лоури заявил, что если задержка действительно критична, вам нужно подумать о том, чтобы ждать с соответствующей привязкой к процессору (т. Е. Выделить ядро ​​для этого потока)

РЕДАКТИРОВАТЬ после 6 января 1010 *

Если я удаляю вызов Thread.sleep () и вместо этого позволяю как производителю, так и потребителю вызывать барьер.await () на каждой итерации (потребитель вызывает его после печати истекшего временина консоль), измеренная задержка уменьшается с 60 микросекунд до менее 10 микросекунд.Если потоки выполняются на одном и том же ядре, задержка становится меньше 1 мкс.Кто-нибудь может объяснить, почему это так значительно уменьшило время ожидания?

Вы смотрите на разницу между java.util.concurrent.locks.LockSupport#park (и соответствующими unpark) и Thread#sleep.Большинство соков основано на LockSupport (часто через AbstractQueuedSynchronizer, который ReentrantLock предоставляет или напрямую), и это (в Hotspot) разрешается до sun.misc.Unsafe#parkunpark), и это имеет тенденцию заканчиваться вруки pthread (posix темы) lib.Обычно pthread_cond_broadcast просыпается и pthread_cond_wait или pthread_cond_timedwait для таких вещей, как BlockingQueue#take.

Не могу сказать, что я когда-либо смотрел на то, как на самом деле реализуется Thread#sleep (потому что я никогда не сталкивался с чем-то с низкой задержкой, не зависящим от условий), но я бы предположил, что этозаставляет его понижать в расписании более агрессивно, чем механизм сигнализации pthread, и именно это объясняет разницу в задержке.

3 голосов
/ 03 января 2011

Я бы использовал ArrayBlockingQueue, если вы можете.Когда я использовал его, задержка в Linux составляла от 8 до 18 микросекунд.Обратите внимание:

  • Стоимость - это в основном время, необходимое для пробуждения нити.Когда вы пробуждаете поток, его данные / код не будут в кеше, так что вы обнаружите, что если вы посчитаете, что произойдет после того, как поток проснется, это может занять в 2-5 раз больше, чем если бы вы выполняли одно и то же несколько раз.1004 *
  • Некоторые операции используют вызовы ОС (такие как блокировка / циклические барьеры), они часто более дороги в сценарии с низкой задержкой, чем ожидание занятости.Я предлагаю пытаться ждать вашего производителя, а не использовать CyclicBarrier.Вы также можете ждать своего потребителя, но в реальной системе это может быть неоправданно дорого.
1 голос
/ 03 января 2011

@ Peter Lawrey

Некоторые операции используют вызовы ОС (например, блокировка / циклические барьеры)

Это НЕ ОС (ядро)звонки.Реализовано с помощью простого CAS (который на x86 поставляется без свободной памяти)

Еще один: не используйте ArrayBlockingQueue, если вы не знаете, почему (вы его используете).

@ OP: Посмотрите наThreadPoolExecutor, он предлагает отличную структуру производителя / потребителя.

Изменить ниже :

, чтобы уменьшить задержку (исключая ожидание занятости), измените очередь на SynchronousQueue и добавьте следующеекак до запуска потребителя

...
consumerThread.setPriority(Thread.MAX_PRIORITY);
consumerThread.start();

Это лучшее, что вы можете получить.


Edit2: Здесь с синхронизацией.очередь.И не распечатывать результаты.

package t1;

import java.math.BigDecimal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;

public class QueueTest {

    static final int RUNS = 250000;

    final SynchronousQueue<Long> queue = new SynchronousQueue<Long>();

    int sleep = 1000;

    long[] results  = new long[0];
    public void start(final int runs) throws Exception {
        results = new long[runs];
        final CountDownLatch barrier = new CountDownLatch(1);
        Thread consumerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                barrier.countDown();
                try {

                    for(int i = 0; i < runs; i++) {                        
                        results[i] = consume(); 

                    }
                } catch (Exception e) {
                    return;
                } 
            }
        });
        consumerThread.setPriority(Thread.MAX_PRIORITY);
        consumerThread.start();


        barrier.await();
        final long sleep = this.sleep;
        for(int i = 0; i < runs; i++) {
            try {                
                doProduce(sleep);

            } catch (Exception e) {
                return;
            }
        }
    }

    private void doProduce(final long sleep) throws InterruptedException {
        produce();
    }

    public void produce() throws InterruptedException {
        queue.put(new Long(System.nanoTime()));//new Long() is faster than value of
    }

    public long consume() throws InterruptedException {
        long t = queue.take();
        long now = System.nanoTime();
        return now-t;
    }

    public static void main(String[] args) throws Throwable {           
        QueueTest test = new QueueTest();
        System.out.println("Starting + warming up...");
        // Run first once, ignoring results
        test.sleep = 0;
        test.start(15000);//10k is the normal warm-up for -server hotspot
        // Run again, printing the results
        System.gc();
        System.out.println("Starting again...");
        test.sleep = 1000;//ignored now
        Thread.yield();
        test.start(RUNS);
        long sum = 0;
        for (long elapsed: test.results){
            sum+=elapsed;
        }
        BigDecimal elapsed = BigDecimal.valueOf(sum, 3).divide(BigDecimal.valueOf(test.results.length), BigDecimal.ROUND_HALF_UP);        
        System.out.printf("Avg: %1.3f micros%n", elapsed); 
    }
}
0 голосов
/ 04 января 2011

Если задержка критична и вам не требуется строгая семантика FIFO, то вы можете рассмотреть LinkedTransferQueue JSR-166. Это позволяет исключить, чтобы противоположные операции могли обмениваться значениями вместо синхронизации в структуре данных очереди. Этот подход помогает уменьшить конфликты, обеспечивает параллельный обмен и позволяет избежать штрафов за спящий режим / пробуждение.

...