Я думаю, что вы уже упомянули все возможные варианты. Нет другого конструктивного способа сделать то, что вам нужно. Во-первых, вам придется использовать исходный поток. Затем создайте новый поток, получите блокировку и используйте этот новый поток (таким образом, вызывая вашу заблокированную операцию). Наконец, создайте еще более новый поток, снимите блокировку и продолжайте обработку этого более нового потока.
Из всех вариантов, которые вы рассматриваете, я бы использовал третий, потому что количество элементов, которые он может обрабатывать, ограничено только памятью, то есть не имеет неявного ограничения максимального размера, как, например, ArrayList
имеет (может содержать около Integer.MAX_VALUE
элементов).
Излишне говорить, что это будет довольно дорогая операция, как в отношении времени, так и в отношении пространства. Вы могли сделать это следующим образом:
Stream<V> temp = Stream.of(objects)
.map(this::preProcess)
.collect(Stream::<V>builder,
Stream.Builder::accept,
(b1, b2) -> b2.build().forEach(b1))
.build();
synchronized (lockObj) {
temp = temp
.map(this::doLockedProcessing)
.collect(Stream::<V>builder,
Stream.Builder::accept,
(b1, b2) -> b2.build().forEach(b1))
.build();
}
temp.map(this::postProcess).forEach(System.out::println);
Обратите внимание, что я использовал один Stream
экземпляр temp
, так что промежуточные потоки (и их компоновщики) могут при необходимости собираться мусором.
Как подсказывает @Eugene в комментариях, было бы неплохо иметь служебный метод, чтобы избежать дублирования кода. Вот такой метод:
public static <T> Stream<T> copy(Stream<T> source) {
return source.collect(Stream::<T>builder,
Stream.Builder::accept,
(b1, b2) -> b2.build().forEach(b1))
.build();
}
Тогда вы можете использовать этот метод следующим образом:
Stream<V> temp = copy(Stream.of(objects).map(this::preProcess));
synchronized (lockObj) {
temp = copy(temp.map(this::doLockedProcessing));
}
temp.map(this::postProcess).forEach(System.out::println);