Java8 Parallel Stream требует времени для суммирования значений - PullRequest
0 голосов
/ 05 июля 2018

Я практикую часть параллельного потока java8 и пишу программу, которая будет суммировать число, переданное в качестве параметра, от 0 до этого числа.

Например, если я передал 10, он суммирует числа от 1 до 10 и возвращает результат.

Ниже находится программа

public class ParellelStreamExample {



    public static void main(String[] args) {
        System.out.println("Long Range value - "+ Long.MIN_VALUE + " to "+ Long.MAX_VALUE);
        long startTime = System.nanoTime();
        long sum = sequentailSum(100000000);
        System.out.println(
                "Time in sequential execution " + (System.nanoTime() - startTime) / 1000000 + " msec with sum = " + sum);
        long startTime1 = System.nanoTime();
        long sum1 = parellelSum(100000000);
        System.out.println("Time in parallel execution " + (System.nanoTime() - startTime1) / 1000000
                + " msec with sum = " + sum1);

    }

    private static Long parellelSum(long n) {
        return Stream.iterate(1l, i -> i + 1).limit(n).parallel().reduce(0L, Long::sum);
    }

    private static Long sequentailSum(long n) {
        return Stream.iterate(1l, i -> i + 1).limit(n).reduce(0L, Long::sum);
    }
}

Вывод, который я получил,

Long Range value - -9223372036854775808 to 9223372036854775807
Time in sequential execution 1741 msec with sum = 5000000050000000

Exception in thread "main" java.lang.OutOfMemoryError
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
    at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
    at java.util.stream.SliceOps$1.opEvaluateParallelLazy(SliceOps.java:155)
    at java.util.stream.AbstractPipeline.sourceSpliterator(AbstractPipeline.java:431)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
    at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:474)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.parellelSum(ParellelStreamExample.java:21)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.main(ParellelStreamExample.java:14)
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
    at java.lang.Long.valueOf(Long.java:840)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample.lambda$0(ParellelStreamExample.java:21)
    at com.abhishek.javainaction.stream.parellel.ParellelStreamExample$$Lambda$3/250421012.apply(Unknown Source)
    at java.util.stream.Stream$1.next(Stream.java:1033)
    at java.util.Spliterators$IteratorSpliterator.trySplit(Spliterators.java:1784)
    at java.util.stream.AbstractShortCircuitTask.compute(AbstractShortCircuitTask.java:114)
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

Почему эта программа не работает в параллельной части и происходит перегрузка gc, вместо этого он должен работать быстрее в параллельной части, так как он использует инфраструктуру fork / join и выполняет процесс через потоки в целом.

Что в этом не так?

Ответы [ 2 ]

0 голосов
/ 05 июля 2018

Я явно не обсуждаю недостатки тестов (;)). Кажется, что основная проблема здесь заключается в понимании использования определенных функций Stream и их поведения.

Попробуйте что-то вроде:

LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum)

но, если честно, следует адаптировать и последовательный:

LongStream.rangeClosed(1, n).reduce(0L, Long::sum)

Теперь я получил такое поведение во время выполнения:

Long Range value - -9223372036854775808 to 9223372036854775807
Time in sequential execution 90 msec with sum = 5000000050000000
Time in parallel execution 25 msec with sum = 5000000050000000

Полагаю, именно этого вы и ожидали.

Как и с любым другим API, вы должны понимать, что делают конкретные методы, особенно если вы хотите идти параллельно. Но, как вы можете видеть, даже последовательная обработка получает огромное преимущество от использования этого другого подхода.

Посмотрите на https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html#StreamOps, чтобы получить представление о типах методов.

Например, использование limit :

Аналогично, операции, которые по своей природе связаны с порядком, например, limit (), может потребоваться буферизация для обеспечения правильного упорядочения, подрывая пользу параллелизма .

0 голосов
/ 05 июля 2018

Есть несколько вещей, которые пошли не так, как надо.

  1. Вы пытаетесь сравнить код с System.nanoTime() вместо чего-то вроде JMH.
  2. Вы пытаетесь parellelize тривиальные вычисления (sum) на Long вместо использования LongStream. Если JVM не может избавиться от бокса, издержки погони за указателями могут легко перевесить преимущества параллелизма.
  3. Вы пытаетесь parellelize наследственно последовательного потока, созданного iterate. Stream Framework попытается выполнить то, что вы просите, буферизируя поток и распределяя его по нескольким потокам, что добавляет много накладных расходов.
  4. Вы используете limit в упорядоченном параллельном потоке. Это требует, чтобы потоковая структура выполняла большую дополнительную синхронизацию, чтобы гарантировать, что именно n первые элементы используются для получения результата. Вы увидите, что если вы поместите .unordered() в параллельный поток, время выполнения резко сократится, но результат будет недетерминированным, поскольку вы получите сумму некоторых n элементов, а не обязательно первые n элементы.

Правильный способ сделать это - использовать JMH и заменить iterate(...).limit(...) на LongStream.rangeClosed(1, n)

...