Как правило, пользовательские операции должны иметь дело с интерфейсом Spliterator
. Он расширяет концепцию Iterator
, добавляя характеристики и информацию о размере, а также возможность разделять часть элементов в качестве другого сплитератора (отсюда и его название). Это также упрощает итерационную логику, поскольку требуется только один метод.
public static <T> Stream<T> takeWhile(Stream<T> s, Predicate<? super T> condition) {
boolean parallel = s.isParallel();
Spliterator<T> spliterator = s.spliterator();
return StreamSupport.stream(new Spliterators.AbstractSpliterator<T>(
spliterator.estimateSize(),
spliterator.characteristics()&~(Spliterator.SIZED|Spliterator.SUBSIZED)) {
boolean active = true;
Consumer<? super T> current;
Consumer<T> adapter = t -> {
if((active = condition.test(t))) current.accept(t);
};
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if(!active) return false;
current = action;
try {
return spliterator.tryAdvance(adapter) && active;
}
finally {
current = null;
}
}
}, parallel).onClose(s::close);
}
Чтобы сохранить свойства потока, мы сначала запрашиваем параллельное состояние, чтобы восстановить его для нового потока. Кроме того, мы регистрируем действие закрытия, которое закроет исходный поток.
Основная работа заключается в реализации Spliterator
, декорирующего разделитель предыдущего состояния потока.
Характеристики сохраняются, за исключением SIZED
и SUBSIZED
, так как наша операция приводит к непредсказуемому размеру. Оригинальный размер все еще передается, теперь он будет использоваться в качестве оценки.
Это решение сохраняет Consumer
, переданное tryAdvance
на время операции, чтобы иметь возможность использовать одного и того же потребителя адаптера, избегая создания нового для каждой итерации. Это работает, поскольку гарантируется, что tryAdvance
никогда не вызывается одновременно.
Параллелизм осуществляется через разбиение, которое наследуется от AbstractSpliterator
. Эта унаследованная реализация будет буферизовать некоторые элементы, что является разумным, поскольку реализация лучшей стратегии для такой операции, как takeWhile
, действительно сложна.
Так что вы можете использовать его как
takeWhile(Stream.of("foo", "bar", "baz", "hello", "world"), s -> s.length() == 3)
.forEach(System.out::println);
который напечатает
foo
bar
baz
или
takeWhile(Stream.of("foo", "bar", "baz", "hello", "world")
.peek(s -> System.out.println("before takeWhile: "+s)), s -> s.length() == 3)
.peek(s -> System.out.println("after takeWhile: "+s))
.forEach(System.out::println);
который напечатает
before takeWhile: foo
after takeWhile: foo
foo
before takeWhile: bar
after takeWhile: bar
bar
before takeWhile: baz
after takeWhile: baz
baz
before takeWhile: hello
, который показывает, что он не обрабатывает больше, чем необходимо. Перед этапом takeWhile
мы должны встретиться с первым несовпадающим элементом, после этого мы встречаемся только с элементами до этого.