Пользовательские операции, не вписывающиеся в функциональный API, могут быть реализованы через интерфейс Spliterator
.
Чтобы привести простой пример, следующая операция объединит элемент String
сего предыдущий элемент, если не- null
:
public static Stream<String> concatWithPrevious(Stream<String> source) {
boolean parallel = source.isParallel();
Spliterator<String> sp = source.spliterator();
return StreamSupport.stream(new Spliterators.AbstractSpliterator<String>(
sp.estimateSize(),
sp.characteristics()&~(Spliterator.DISTINCT|Spliterator.SORTED)) {
private String previous;
@Override
public boolean tryAdvance(Consumer<? super String> action) {
return sp.tryAdvance(s -> {
String p = previous;
previous = s;
action.accept(p == null? s: s == null? p: p.concat(s));
});
}
}, parallel).onClose(source::close);
}
Центральным элементом является метод tryAdvance
, который должен вызвать метод Consumer
accept
со следующим элементом и вернуть true
, если он есть, или просто вернуть false
, если достигнут конец потока.
Есть также характеристики и предполагаемый размер (который будет точным размером, когда SIZED
имеется характеристика), которую приведенный выше пример в основном берет из сплитератора исходного потока.Я оставляю это в качестве упражнения для читателя, почему здесь удаляются характеристики DISTINCT
и SORTED
, когда они присутствуют для исходного потока.
Параллельная обработка будет включена с помощью метода trySplit
, который будетнаследоваться от AbstractSpliterator
здесь.Этот метод будет выполнять буферизацию элементов в массив, что не очень эффективно, но для сплитератора, имеющего такую зависимость от предыдущего элемента, это лучшее, что мы можем получить.
Когда мы запустим этоНапример, с
concatWithPrevious(
IntStream.range('A', 'Z')
.mapToObj(i -> String.valueOf((char)i))
.peek(s -> System.out.println("source stream: "+s))
)
.filter(Predicate.isEqual("EF"))
.findFirst()
.ifPresent(s -> System.out.println("result: "+s));
мы получаем
source stream: A
source stream: B
source stream: C
source stream: D
source stream: E
source stream: F
result: EF
, демонстрирующий, что лень потоков все еще сохраняется.
Взяв в качестве примера эскиз вашей задачи,Я хотел бы рассмотреть возможность изменения кода, например
public class NotificationProcessor {
@Autowired
private Averager averager;
@Autowired
private TrendAnalyser trendAnalyser;
private long prevNotificationTime;
public void consume(Message message, Queue<Notification> queue) {
if (message.getRate() > averager.getAverage() + THRESHOLD) {
// Generate notification A here
queue.add(…);
}
// Adjust the moving average
averager.put(message);
trendAnalyser.analyze(message);
if (trendAnalyser.isFalling()) {
Date now = new Date();
// Throttle
if (now.getTime() - prevNotificationTime > 60) {
prevNotificationTime = now.getTime();
// Generate notification B here
queue.add(…);
}
}
}
}
, и использовать его в операции потока, например
public static Stream<Notification> notificationProcessor(Stream<Message> source) {
// replace with intended factory mechanism or make it a parameter
NotificationProcessor proc = new NotificationProcessor();
boolean parallel = source.isParallel();
Spliterator<Message> sp = source.spliterator();
return StreamSupport.stream(new Spliterators.AbstractSpliterator<Notification>(
sp.estimateSize(),
sp.characteristics() & Spliterator.ORDERED | Spliterator.NONNULL) {
final Queue<Notification> queue = new ArrayDeque<>(2);
@Override
public boolean tryAdvance(Consumer<? super Notification> action) {
while(queue.isEmpty()) {
if(!sp.tryAdvance(msg -> proc.consume(msg, queue))) {
return false;
}
}
action.accept(queue.remove());
return true;
}
}, parallel).onClose(source::close);
}
Поскольку каждый элемент источника может генерировать от нуля до двух элементов, не можетбыть характеристикой SIZED
, на самом деле я решил быть здесь консервативным и сохранить только характеристику ORDERED
, которая, как вы сказали, относится к вашей операции, и добавила NONNULL
, которая кажется подходящей для вашего кода.
Поскольку каждый вызов tryAdvance
должен обеспечивать ровно один элемент или не иметь элемента только при достижении конца потока, существует очередь из двух элементовэто нужно¹.Если очередь пуста, источник будет запрашиваться до тех пор, пока не будет хотя бы один элемент или пока не будет достигнут конец источника.Затем следующий элемент будет передан потребителю, если в очереди есть элемент.
here Здесь мы можем работать с очередью размером один, сразу же потребляя первый ожидающий элемент без очереди, но это значительно усложнит код