Возьмите элементы до определенного персонажа и сгруппируйте их с помощью RxJava - PullRequest
3 голосов
/ 29 мая 2019

У меня есть простая установка для проблемы, но решение кажется более сложным.

Настройка: у меня есть горячая наблюдаемая, которая исходит от сканера, который испускает каждое число как отдельный элемент и R когда код завершен.

Проблема: Из этого я хочу получить наблюдаемую в горячем виде, которая испускает каждый полный код как 1 элемент.

RxMarbles example

Я пытался поиграть с разными операторами flatMap, takeUntil и groupBy, но не смог найти решение.

1 Ответ

2 голосов
/ 29 мая 2019

Вы можете использовать оператор buffer .

PublishSubject<Token<Integer>> s = PublishSubject.create();

Observable<Token<Integer>> markers = s.filter(x->x.isMarker());

s.buffer(markers).subscribe(
    v->{
        Optional<Integer> reduce = v.stream()
            .filter(t->!t.isMarker())
            .map(t->(ValueToken<Integer>)t)
            .map(ValueToken::get)
            .reduce((a,b)->a+b);
        reduce.ifPresent(System.out::println);
    }
);

s.onNext(value(12));
s.onNext(value(13));
s.onNext(marker()); // will emit 25

s.onNext(value(10));
s.onNext(value(7));
s.onNext(marker()); // will emit 17

s.onNext(value(10));
s.onNext(value(7)); // Not emitting yet

Я создал класс для переноса значений и маркеров в потоке.

public abstract class Token<T> {
    private static final MarkerToken MARKER = new MarkerToken<>();

    public boolean isMarker() {
        return false;
    }

    public static <T> MarkerToken<T> marker() {
        return MARKER;
    }

    public static <T> ValueToken<T> value(T o) {
        return new ValueToken<>(o);
    }

    public static class ValueToken<T> extends Token<T> {
        T value;

        public ValueToken(T value) {
            this.value = value;
        }

        public T get() {
            return value;
        }
    }

    public static class MarkerToken<T> extends Token<T> {
        public boolean isMarker() {
            return true;
        }
    }

}

обновление (с использованием сканирования)

Предыдущий метод генерировал бы также при закрытии потока, с этим решением вы можете создать только полных буферов.

Класс сообщения функционирует как аккумулятор, он будет накапливать токены до тех пор, пока не будет накоплен маркер закрытия.

Когда это происходит, сообщение next начинается с нуля.

Наличие закрывающей метки в качестве последнего элемента отмечает сообщение как завершенное.

public static class Message<T> {
    List<Token<T>> tokens = new ArrayList<>();

    public Message<T> append(Token<T> t) {

        Message<T> mx = new Message<T>();
        if(!isComplete()) {
            mx.tokens.addAll(tokens);
        }
        mx.tokens.add(t);
        return mx;
    }

    public boolean isComplete() {
        int n = tokens.size();
        return n>0 && tokens.get(n-1).isMarker();
    }

    public Optional<List<Token<T>>> fullMessage(){
        return isComplete() ? Optional.of(tokens):Optional.empty(); 
    }
}

Сканируя источник отправляемого сообщения для каждого отправленного токена, затем вы отфильтровываете неполное сообщение и отправляете только тот, который помечен как завершенный.

    s.scan(new Message<Integer>(), (a, b) -> a.append(b))
        .filter(Message::isComplete)
        .map(Message::fullMessage)
        .map(Optional::get).subscribe(v -> {
            System.out.println(v);
        });

    s.onNext(value(12));
    s.onNext(value(13));
    s.onNext(marker());// [V(12), V(13), MARKER]

    s.onNext(value(10));
    s.onNext(value(7));
    s.onNext(marker()); // [V(10), V(7), MARKER]



    s.onNext(value(10));
    s.onNext(value(127));

    s.onComplete(); // Not emitting incomplete messages on the closing of the subject.
...