Java NIO Pipe против BlockingQueue - PullRequest
       10

Java NIO Pipe против BlockingQueue

10 голосов
/ 26 сентября 2011

Я только что обнаружил, что у него есть средство NIO, Java NIO Pipe, предназначенное для передачи данных между потоками.Есть ли какое-либо преимущество использования этого механизма по сравнению с более обычной передачей сообщений по очереди, такой как ArrayBlockingQueue?

Ответы [ 4 ]

6 голосов
/ 26 сентября 2011

Обычно самый простой способ передать данные для обработки другим потоком - это использовать ExecutorService.Это оборачивает очередь и пул потоков (может иметь один поток)

Вы можете использовать канал, когда у вас есть библиотека, которая поддерживает каналы NIO.Это также полезно, если вы хотите передавать ByteBuffers данных между потоками.

В противном случае обычно проще / быстрее использовать ArrayBlockingQueue.

Если вы хотите более быстрый способ обмена данными между потоками, ясоветуем вам взглянуть на Exchanger , однако он не так универсален, как ArrayBlockingQueue.

Exchanger и Java без GC

3 голосов
/ 25 марта 2012

Поэтому после многих проблем с конвейером ( проверьте здесь ) я решил отдать предпочтение неблокирующим параллельным очередям по каналам NIO.Итак, я сделал несколько тестов на Java ConcurrentLinkedQueue.См. Ниже:

public static void main(String[] args) throws Exception {

    ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();

    // first test nothing:

    for (int j = 0; j < 20; j++) {

        Benchmarker bench = new Benchmarker();

        String s = "asd";

        for (int i = 0; i < 1000000; i++) {
            bench.mark();
            // s = queue.poll();
            bench.measure();
        }

        System.out.println(bench.results());

        Thread.sleep(100);
    }

    System.out.println();

    // first test empty queue:

    for (int j = 0; j < 20; j++) {

        Benchmarker bench = new Benchmarker();

        String s = "asd";

        for (int i = 0; i < 1000000; i++) {
            bench.mark();
            s = queue.poll();
            bench.measure();
        }

        System.out.println(bench.results());

        Thread.sleep(100);
    }

    System.out.println();

    // now test polling one element on a queue with size one

    for (int j = 0; j < 20; j++) {

        Benchmarker bench = new Benchmarker();

        String s = "asd";
        String x = "pela";

        for (int i = 0; i < 1000000; i++) {
            queue.offer(x);
            bench.mark();
            s = queue.poll();
            bench.measure();
            if (s != x) throw new Exception("bad!");
        }

        System.out.println(bench.results());

        Thread.sleep(100);
    }

    System.out.println();

    // now test polling one element on a queue with size two

    for (int j = 0; j < 20; j++) {

        Benchmarker bench = new Benchmarker();

        String s = "asd";
        String x = "pela";

        for (int i = 0; i < 1000000; i++) {
            queue.offer(x);
            queue.offer(x);
            bench.mark();
            s = queue.poll();
            bench.measure();
            if (s != x) throw new Exception("bad!");
            queue.poll();
        }

        System.out.println(bench.results());

        Thread.sleep(100);
    }
}

Результаты:

totalLogs=1000000, minTime=0, maxTime=85000, avgTime=58.61 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=5281000, avgTime=63.35 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=725000, avgTime=59.71 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=25000, avgTime=58.13 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=378000, avgTime=58.45 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=15000, avgTime=57.71 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=170000, avgTime=58.11 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1495000, avgTime=59.87 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=232000, avgTime=63.0 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=184000, avgTime=57.89 (times in nanos)

totalLogs=1000000, minTime=0, maxTime=2600000, avgTime=65.22 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=850000, avgTime=60.5 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=150000, avgTime=63.83 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=43000, avgTime=59.75 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=276000, avgTime=60.02 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=457000, avgTime=61.69 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=204000, avgTime=60.44 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=154000, avgTime=63.67 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=355000, avgTime=60.75 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=338000, avgTime=60.44 (times in nanos)

totalLogs=1000000, minTime=0, maxTime=345000, avgTime=110.93 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=396000, avgTime=100.32 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=298000, avgTime=98.93 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1891000, avgTime=101.9 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=254000, avgTime=103.06 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1894000, avgTime=100.97 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=230000, avgTime=99.21 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=348000, avgTime=99.63 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=922000, avgTime=99.53 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=168000, avgTime=99.12 (times in nanos)

totalLogs=1000000, minTime=0, maxTime=686000, avgTime=107.41 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=320000, avgTime=95.58 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=248000, avgTime=94.94 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=217000, avgTime=95.01 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=159000, avgTime=93.62 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=155000, avgTime=95.28 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=106000, avgTime=98.57 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=370000, avgTime=95.01 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1836000, avgTime=96.21 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=212000, avgTime=98.62 (times in nanos)

Вывод:

MaxTime может быть пугающим, но я думаю, что можно с уверенностью сказать, что мы находимся в 50Nanos диапазон для опроса параллельной очереди.

2 голосов
/ 26 сентября 2011

Я полагаю, что NIO Pipe был спроектирован так, чтобы вы могли отправлять данные в канал внутри цикла выбора безопасным для потока способом, другими словами, любой поток может записывать в канал, и данные будут обрабатываться в другом экстремальном режиме. трубы, внутри петли селектора. Когда вы пишете в канал, вы делаете канал на другой стороне доступным для чтения.

1 голос
/ 26 сентября 2011

Я полагаю, что канал будет иметь лучшую задержку, так как он вполне может быть реализован с сопрограммами за кулисами.Таким образом, производитель немедленно уступает потребителю, когда доступны данные, а не когда решает планировщик потока.

Трубы в целом представляют проблему потребителя-производителя и, скорее всего, будут реализованы таким образом, чтобы оба потока взаимодействовалии не выгружаются извне.

...