Вы можете использовать оператор buffer .
PublishSubject<Token<Integer>> s = PublishSubject.create();
Observable<Token<Integer>> markers = s.filter(x->x.isMarker());
s.buffer(markers).subscribe(
v->{
Optional<Integer> reduce = v.stream()
.filter(t->!t.isMarker())
.map(t->(ValueToken<Integer>)t)
.map(ValueToken::get)
.reduce((a,b)->a+b);
reduce.ifPresent(System.out::println);
}
);
s.onNext(value(12));
s.onNext(value(13));
s.onNext(marker()); // will emit 25
s.onNext(value(10));
s.onNext(value(7));
s.onNext(marker()); // will emit 17
s.onNext(value(10));
s.onNext(value(7)); // Not emitting yet
Я создал класс для переноса значений и маркеров в потоке.
public abstract class Token<T> {
private static final MarkerToken MARKER = new MarkerToken<>();
public boolean isMarker() {
return false;
}
public static <T> MarkerToken<T> marker() {
return MARKER;
}
public static <T> ValueToken<T> value(T o) {
return new ValueToken<>(o);
}
public static class ValueToken<T> extends Token<T> {
T value;
public ValueToken(T value) {
this.value = value;
}
public T get() {
return value;
}
}
public static class MarkerToken<T> extends Token<T> {
public boolean isMarker() {
return true;
}
}
}
обновление (с использованием сканирования)
Предыдущий метод генерировал бы также при закрытии потока, с этим решением вы можете создать только полных буферов.
Класс сообщения функционирует как аккумулятор, он будет накапливать токены до тех пор, пока не будет накоплен маркер закрытия.
Когда это происходит, сообщение next начинается с нуля.
Наличие закрывающей метки в качестве последнего элемента отмечает сообщение как завершенное.
public static class Message<T> {
List<Token<T>> tokens = new ArrayList<>();
public Message<T> append(Token<T> t) {
Message<T> mx = new Message<T>();
if(!isComplete()) {
mx.tokens.addAll(tokens);
}
mx.tokens.add(t);
return mx;
}
public boolean isComplete() {
int n = tokens.size();
return n>0 && tokens.get(n-1).isMarker();
}
public Optional<List<Token<T>>> fullMessage(){
return isComplete() ? Optional.of(tokens):Optional.empty();
}
}
Сканируя источник отправляемого сообщения для каждого отправленного токена, затем вы отфильтровываете неполное сообщение и отправляете только тот, который помечен как завершенный.
s.scan(new Message<Integer>(), (a, b) -> a.append(b))
.filter(Message::isComplete)
.map(Message::fullMessage)
.map(Optional::get).subscribe(v -> {
System.out.println(v);
});
s.onNext(value(12));
s.onNext(value(13));
s.onNext(marker());// [V(12), V(13), MARKER]
s.onNext(value(10));
s.onNext(value(7));
s.onNext(marker()); // [V(10), V(7), MARKER]
s.onNext(value(10));
s.onNext(value(127));
s.onComplete(); // Not emitting incomplete messages on the closing of the subject.