Многоразовая оболочка / объект одного экземпляра в карте потоков Java - PullRequest
9 голосов
/ 05 июня 2019

Похоже, что на этот вопрос уже должен быть ответ, но я не смог найти дубликат.

В любом случае, мне интересно, что сообщество думает о Stream.map сценарии использования, как этот?

Wrapper wrapper = new Wrapper();
list.stream()
    .map( s -> {
        wrapper.setSource(s);
        return wrapper;
    } )
    .forEach( w -> processWrapper(w) );

public static class Source {
    private final String name;

    public Source(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }
}

public static class Wrapper {
    private Source source = null;

    public void setSource(Source source) {
        this.source = source;
    }

    public String getName() {
        return source.getName();
    }
}

public void processWrapper(Wrapper wrapper) {
}

Я не большой поклонник такого использования map, но он потенциально может помочь с производительностью при работе с большими потоками и избежать создания ненужных Wrapper для каждого Source.

Это определенно имеет свои ограничения, например, почти бесполезен для параллельных потоков и работы терминала, например collect.

Обновление - Вопрос не в том, «как это сделать», а в том, «могу ли я сделать это таким образом». Например, у меня может быть код, который работает только с Wrapper, и я хочу вызвать его в forEach, но хочу избежать создания его нового экземпляра для каждого элемента Source.

Результаты тестов

Показывает улучшение 8 с многоразовой оберткой-

Бенчмарк (N) Режим Cnt Оценка Единицы ошибки

BenchmarkTest.noReuse 10000000 avgt 5 870.253 ± 122.495 мс / оп

BenchmarkTest.withReuse 10000000 avgt 5 113.694 ± 2.528 мс / оп

Код эталона -

import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@Fork(value = 2, jvmArgs = {"-Xms2G", "-Xmx2G"})
public class BenchmarkTest {

    @Param({"10000000"})
    private int N;

    private List<Source> data;

    public static void main(String[] args) throws Exception {
        Options opt = new OptionsBuilder()
            .include(BenchmarkTest.class.getSimpleName())
            .forks(1)
            .build();
        new Runner(opt).run();
    }

    @Setup
    public void setup() {
        data = createData();
    }

    @Benchmark
    public void noReuse(Blackhole bh) {
        data.stream()
            .map( s -> new Wrapper1( s.getName() ) )
            .forEach( t -> processTarget(bh, t) );
    }

    @Benchmark
    public void withReuse(Blackhole bh) {
        Wrapper2 wrapper = new Wrapper2();
        data.stream()
            .map( s -> { wrapper.setSource(s); return wrapper; } )
            .forEach( w -> processTarget(bh, w) );
    }

    public void processTarget(Blackhole bh, Wrapper t) {
        bh.consume(t);
    }

    private List<Source> createData() {
        List<Source> data = new ArrayList<>();
        for (int i = 0; i < N; i++) {
            data.add( new Source("Number : " + i) );
        }
        return data;
    }

    public static class Source {
        private final String name;

        public Source(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }

    public interface Wrapper {
        public String getName();
    }

    public static class Wrapper1 implements Wrapper {
        private final String name;

        public Wrapper1(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }

    public static class Wrapper2 implements Wrapper {
        private Source source = null;

        public void setSource(Source source) {
            this.source = source;
        }

        public String getName() {
            return source.getName();
        }
    }
}

Полный отчет о тестировании -

# JMH version: 1.21
# VM version: JDK 1.8.0_191, Java HotSpot(TM) 64-Bit Server VM, 25.191-b12
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_191.jdk/Contents/Home/jre/bin/java
# VM options: -Xms2G -Xmx2G
# Warmup: 5 iterations, 10 s each
# Measurement: 5 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: BenchmarkTest.noReuse
# Parameters: (N = 10000000)

# Run progress: 0.00% complete, ETA 00:03:20
# Fork: 1 of 1
# Warmup Iteration   1: 1083.656 ms/op
# Warmup Iteration   2: 846.485 ms/op
# Warmup Iteration   3: 901.164 ms/op
# Warmup Iteration   4: 849.659 ms/op
# Warmup Iteration   5: 903.805 ms/op
Iteration   1: 847.008 ms/op
Iteration   2: 895.800 ms/op
Iteration   3: 892.642 ms/op
Iteration   4: 825.901 ms/op
Iteration   5: 889.914 ms/op


Result "BenchmartTest.noReuse":
  870.253 ±(99.9%) 122.495 ms/op [Average]
  (min, avg, max) = (825.901, 870.253, 895.800), stdev = 31.812
  CI (99.9%): [747.758, 992.748] (assumes normal distribution)


# JMH version: 1.21
# VM version: JDK 1.8.0_191, Java HotSpot(TM) 64-Bit Server VM, 25.191-b12
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_191.jdk/Contents/Home/jre/bin/java
# VM options: -Xms2G -Xmx2G
# Warmup: 5 iterations, 10 s each
# Measurement: 5 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: BenchmarkTest.withReuse
# Parameters: (N = 10000000)

# Run progress: 50.00% complete, ETA 00:01:58
# Fork: 1 of 1
# Warmup Iteration   1: 113.780 ms/op
# Warmup Iteration   2: 113.643 ms/op
# Warmup Iteration   3: 114.323 ms/op
# Warmup Iteration   4: 114.258 ms/op
# Warmup Iteration   5: 117.351 ms/op
Iteration   1: 114.526 ms/op
Iteration   2: 113.944 ms/op
Iteration   3: 113.943 ms/op
Iteration   4: 112.930 ms/op
Iteration   5: 113.124 ms/op


Result "BenchmarkTest.withReuse":
  113.694 ±(99.9%) 2.528 ms/op [Average]
  (min, avg, max) = (112.930, 113.694, 114.526), stdev = 0.657
  CI (99.9%): [111.165, 116.222] (assumes normal distribution)


# Run complete. Total time: 00:03:40

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark                     (N)  Mode  Cnt    Score     Error  Units
BenchmarkTest.noReuse    10000000  avgt    5  870.253 ± 122.495  ms/op
BenchmarkTest.withReuse  10000000  avgt    5  113.694 ±   2.528  ms/op

Ответы [ 3 ]

11 голосов
/ 08 июня 2019

Ваш подход работает, потому что потоковый конвейер состоит только из операции без сохранения состояния.В таких сочетаниях последовательная оценка потока может обрабатывать по одному элементу за раз, поэтому доступ к экземплярам оболочки не перекрывается, как показано на рисунке .Но обратите внимание, что это не гарантированное поведение.

Это определенно не работает с такими операциями с состоянием, как sorted и distinct.Он также не может работать с операциями сокращения, поскольку они всегда должны содержать как минимум два элемента для обработки, которые включают reduce, min и max.В случае collect это зависит от конкретного Collector.forEachOrdered не будет работать с параллельными потоками из-за необходимой буферизации.

Обратите внимание, что параллельная обработка будет проблематичной, даже если вы используете TheadLocal для создания оберток, ограниченных потоками, так как нет гарантии, что объектысозданные в одном рабочем потоке, остаются локальными для этого потока.Рабочий поток может передать частичный результат другому потоку, прежде чем подхватить другую, не связанную рабочую нагрузку.

Таким образом, эта общая изменяемая оболочка работает с определенным набором операций без сохранения состояния, таких как map, filter, forEach, findFirst/Any, all/any/noneMatch, в последовательном выполнении конкретной реализации.Вы не получаете гибкость API, поскольку вам нужно ограничить себя, не можете передать поток в произвольный код, ожидающий Stream, и не использовать произвольные Collector реализации.У вас также нет инкапсуляции интерфейса, так как вы предполагаете конкретное поведение реализации.

Другими словами, если вы хотите использовать такую ​​изменяемую оболочку, вам лучше использовать цикл, реализующий конкретныйоперация.У вас уже есть недостатки такой ручной реализации, так почему бы не реализовать ее, чтобы иметь преимущества.


Другой аспект, который следует учитывать, - это то, что вы получаете от повторного использования такой изменяемой оболочки.Он работает только в циклах, когда временный объект может быть оптимизирован после применения Escape-анализа в любом случае.В таких сценариях повторное использование объектов, продление срока их службы может фактически ухудшить производительность.

Конечно, масштабирование объектов не является гарантированным поведением.Могут быть сценарии, такие как длинный поток, превышающий предел встраивания JVM, когда объекты не удаляются.Но все же временные объекты не обязательно дороги.

Это было объяснено в этом ответе .Временные объекты выделяются дешево.Основные затраты на сборку мусора связаны с объектами, которые еще живы.Они должны быть пройдены, и их необходимо перемещать, освобождая место для новых распределений.Негативное влияние временных объектов заключается в том, что они могут сократить время между циклами сбора мусора.Но это зависит от скорости выделения и доступного пространства выделения, так что это действительно проблема, которую можно решить, добавив больше оперативной памяти.Больше ОЗУ означает больше времени между циклами GC и больше мертвых объектов, когда происходит GC, что делает чистые затраты GC меньшими.

Тем не менее, недопустимо чрезмерное распределение временных объектов,Существование IntStream, LongStream и DoubleStream показывает это.Но они особенные, так как использование примитивных типов является жизнеспособной альтернативой использованию объектов-оболочек без недостатков повторного использования изменяемой оболочки.Это также отличается тем, что применяется к задачам, в которых тип примитива и тип оболочки семантически эквивалентны.Напротив, вы хотите решить проблему, когда для операции требуется тип оболочки.К примитивному потоку также относится, когда вам нужны объекты для вашей задачи, нет способа обойти бокс, который будет создавать отдельные объекты для разных значений, не разделяя изменяемый объект.

Так что если вы симСуществует проблема, когда существует семантически эквивалентная альтернатива, позволяющая избежать обертки-объекта без существенных проблем, например, просто используя Comparator.comparingInt вместо Comparator.comparing, где это возможно, вы все равно можете предпочесть его.Но только тогда.


Короче говоря, в большинстве случаев экономия от повторного использования такого объекта, если таковая имеется, не будет оправдывать недостатки.В особых случаях, когда это выгодно и важно, вам может быть лучше использовать цикл или любую другую конструкцию под вашим полным контролем, вместо использования Stream.

9 голосов
/ 08 июня 2019

Вы можете иметь несколько удобных функций, а также иметь поточно-ориентированную версию для работы с параллелью.

Function<T,U> threadSafeReusableWrapper(Supplier<U> newWrapperInstanceFn, BiConsumer<U,T> wrapFn) {
   final ThreadLocal<T> wrapperStorage = ThreadLocal.withInitial(newWrapperInstanceFn);
   return item -> {
      T wrapper = wrapperStorage.get();
      wrapFn.consume(wrapper, item);
      return wrapper;
   }
}

Function<T,U> reusableWrapper(U wrapper, BiConsumer<U,T> wrapFn) {
   return item -> {
      wrapFn.consume(wrapper, item);
      return wrapper;
   };
}

list.stream()
    .map(reusableWrapper(new Wrapper(), Wrapper::setSource))
    .forEach( w -> processWrapper(w) );
list.stream()
    .map(threadSafeReusableWrapper(Wrapper::new, Wrapper::setSource))
     .parallel()
    .forEach( w -> processWrapper(w) );

Однако я не думаю, что оно того стоит.Эти обертки недолговечны, поэтому вряд ли оставят молодое поколение, поэтому мусор будет собираться очень быстро .Хотя я думаю, что эту идею стоит проверить с помощью библиотеки микро-эталонов JMH

4 голосов
/ 08 июня 2019

Хотя это возможно, обращение к объекту вне потока делает код менее функциональным по стилю. Очень близкий эквивалент, который лучше инкапсулирован, может быть достигнут просто с помощью вспомогательной функции:

public class Context {

    private static final Wrapper WRAPPER = new Wrapper();

    private static void helper(Source source) {
        WRAPPER.setSource(source);
        processWrapper(WRAPPER);
    }

    public static void main(String[] args) {
        List<Source> list = Arrays.asList(new Source("Foo"), new Source("Baz"), new Source("Bar"));
        list.stream().forEach(Context::helper);
}
...