Похоже, это связано со старой проблемой « Почему filter () после flatMap ()« не полностью »ленив в Java потоках? ». Хотя эта проблема была исправлена для встроенных операций Stream, она, кажется, все еще существует, когда мы пытаемся выполнить итерацию по потоку с плоским отображением извне.
Мы можем упростить код, чтобы воспроизвести проблему до
Stream.of(LongStream.range(0, Integer.MAX_VALUE))
.flatMapToLong(x -> x)
.iterator().hasNext();
Обратите внимание, что использование Spliterator
также влияет на
Stream.of(LongStream.range(0, Integer.MAX_VALUE))
.flatMapToLong(x -> x)
.spliterator()
.tryAdvance((long l) -> System.out.println("first item: "+l));
Оба пытаются буферизовать элементы до тех пор, пока в конечном итоге не выйдут с помощью OutOfMemoryError
.
, поскольку spliterator().forEachRemaining(…)
, похоже, не В этом случае вы можете реализовать решение, которое будет работать для вашего варианта использования forEach
, но это будет fr agile, поскольку оно по-прежнему будет создавать проблему для коротких замыканий потоковых операций.
public static <T> Stream<List<T>> buffer(Stream<T> stream, final int count) {
boolean parallel = stream.isParallel();
Spliterator<T> source = stream.spliterator();
return StreamSupport.stream(
new Spliterators.AbstractSpliterator<List<T>>(
(source.estimateSize()+count-1)/count, source.characteristics()
&(Spliterator.SIZED|Spliterator.DISTINCT|Spliterator.ORDERED)
| Spliterator.NONNULL) {
List<T> list;
Consumer<T> c = t -> list.add(t);
@Override
public boolean tryAdvance(Consumer<? super List<T>> action) {
if(list == null) list = new ArrayList<>(count);
if(!source.tryAdvance(c)) return false;
do {} while(list.size() < count && source.tryAdvance(c));
action.accept(list);
list = null;
return true;
}
@Override
public void forEachRemaining(Consumer<? super List<T>> action) {
source.forEachRemaining(t -> {
if(list == null) list = new ArrayList<>(count);
list.add(t);
if(list.size() == count) {
action.accept(list);
list = null;
}
});
if(list != null) {
action.accept(list);
list = null;
}
}
}, parallel);
}
Но обратите внимание, что решения на основе Spliterator
в целом предпочтительнее, так как они поддерживают передачу дополнительной информации, позволяющей оптимизировать работу, и во многих случаях используют более низкие затраты на итерацию. Так что это путь к go после исправления этой проблемы в коде JDK.
В качестве обходного пути вы можете использовать Stream.concat(…)
для объединения потоков, но у него есть явное предупреждение о том, что не нужно объединять слишком много потоков одновременно в его документации :
Будьте осторожны при построении потоков из повторной конкатенации. Доступ к элементу глубоко конкатенированного потока может привести к глубоким цепочкам вызовов или даже к StackOverflowException
[sic].
Имя throwable было исправлено до StackOverflowError
в документации Java 9