Какой самый эффективный способ сделать копию потока? - PullRequest
0 голосов
/ 22 января 2019

У меня есть метод, который выполняет обработку в потоке. Часть этой обработки должна быть выполнена под контролем блокировки - одной заблокированной секции для обработки всех элементов - но некоторые из этого не делают (и не должны быть, потому что это может быть довольно много времени). Поэтому я не могу просто сказать:

Stream<V> preprocessed = Stream.of(objects).map(this::preProcess);
Stream<V> toPostProcess;
synchronized (lockObj) {
    toPostProcess = preprocessed.map(this::doLockedProcessing);
}
toPostProcess.map(this::postProcess).forEach(System.out::println);

потому что вызовы doLockedProcessing будут выполняться только тогда, когда вызывается операция терминала forEach, и это за пределами блокировки.

Так что я думаю, что мне нужно сделать копию потока, используя терминальную операцию, на каждом этапе, чтобы правильные биты были сделаны в нужное время. Что-то вроде:

Stream<V> preprocessed = Stream.of(objects).map(this::preProcess).copy();
Stream<V> toPostProcess;
synchronized (lockObj) {
    toPostProcess = preprocessed.map(this::doLockedProcessing).copy();
}
toPostProcess.map(this::postProcess).forEach(System.out::println);

Конечно, метод copy() не существует, но если он это сделает, он выполнит терминальную операцию над потоком и вернет новый поток, содержащий все те же элементы.

Мне известно о нескольких способах достижения этого:

(1) Через массив (не так просто, если тип элемента является универсальным):

copy = Stream.of(stream.toArray(String[]::new));

(2) Через список:

copy = stream.collect(Collectors.toList()).stream();

(3) Через построитель потока:

Stream.Builder<V> builder = Stream.builder();
stream.forEach(builder);
copy = builder.build();

Что я хочу знать: какой из этих методов наиболее эффективен с точки зрения времени и памяти? Или есть другой способ, который лучше?

Ответы [ 3 ]

0 голосов
/ 22 января 2019

Wrap doLockedProcessing это синхронно. Вот один из способов:

class SynchronizedFunction<T, R> {
    private Function<T, R> function;
    public SynchronizedFunction(Function<T, R> function) {
        this.function = function;
    }
    public synchronized R apply(T t) {
        return function.apply(t);
    }
}

Затем используйте это в своем потоке:

stream.parellel()
  .map(this:preProcess)
  .map(new SynchronizedFunction<>(this::doLockedProcessing))
  .forEach(this::postProcessing)

Это будет последовательно обрабатывать заблокированный код, но в противном случае будет parellel.

0 голосов
/ 23 января 2019

Я создал контрольный тест, который сравнивает три метода.Это предполагает, что использование List в качестве промежуточного хранилища примерно на 30% медленнее, чем использование массива или Stream.Builder, которые похожи.Поэтому я обращаюсь к использованию Stream.Builder, потому что преобразование в массив сложно, когда тип элемента является универсальным типом.

В итоге я написал небольшую функцию, которая создает Collector, которая используетStream.Builder в качестве промежуточного хранилища:

private static <T> Collector<T, Stream.Builder<T>, Stream<T>> copyCollector()
{
    return Collector.of(Stream::builder, Stream.Builder::add, (b1, b2) -> {
        b2.build().forEach(b1);
        return b1;
    }, Stream.Builder::build);
}

Затем я могу сделать копию любого потока str, выполнив str.collect(copyCollector()), что вполне соответствует идиоматическому использованию потоков.

Исходный код, который я разместил, будет выглядеть так:

Stream<V> preprocessed = Stream.of(objects).map(this::preProcess).collect(copyCollector());
Stream<V> toPostProcess;
synchronized (lockObj) {
    toPostProcess = preprocessed.map(this::doLockedProcessing).collect(copyCollector());
}
toPostProcess.map(this::postProcess).forEach(System.out::println);
0 голосов
/ 22 января 2019

Я думаю, что вы уже упомянули все возможные варианты. Нет другого конструктивного способа сделать то, что вам нужно. Во-первых, вам придется использовать исходный поток. Затем создайте новый поток, получите блокировку и используйте этот новый поток (таким образом, вызывая вашу заблокированную операцию). Наконец, создайте еще более новый поток, снимите блокировку и продолжайте обработку этого более нового потока.

Из всех вариантов, которые вы рассматриваете, я бы использовал третий, потому что количество элементов, которые он может обрабатывать, ограничено только памятью, то есть не имеет неявного ограничения максимального размера, как, например, ArrayList имеет (может содержать около Integer.MAX_VALUE элементов).

Излишне говорить, что это будет довольно дорогая операция, как в отношении времени, так и в отношении пространства. Вы могли сделать это следующим образом:

Stream<V> temp = Stream.of(objects)
        .map(this::preProcess)
        .collect(Stream::<V>builder,
                 Stream.Builder::accept,
                 (b1, b2) -> b2.build().forEach(b1))
        .build();

synchronized (lockObj) {
    temp = temp
            .map(this::doLockedProcessing)
            .collect(Stream::<V>builder,
                     Stream.Builder::accept,
                     (b1, b2) -> b2.build().forEach(b1))
            .build();
}

temp.map(this::postProcess).forEach(System.out::println);

Обратите внимание, что я использовал один Stream экземпляр temp, так что промежуточные потоки (и их компоновщики) могут при необходимости собираться мусором.


Как подсказывает @Eugene в комментариях, было бы неплохо иметь служебный метод, чтобы избежать дублирования кода. Вот такой метод:

public static <T> Stream<T> copy(Stream<T> source) {
    return source.collect(Stream::<T>builder,
                          Stream.Builder::accept,
                          (b1, b2) -> b2.build().forEach(b1))
                 .build();
}

Тогда вы можете использовать этот метод следующим образом:

Stream<V> temp = copy(Stream.of(objects).map(this::preProcess));

synchronized (lockObj) {
    temp = copy(temp.map(this::doLockedProcessing));
}

temp.map(this::postProcess).forEach(System.out::println);
...