Наблюдаемый с буфером и множественными обновлениями значения в Java RX? - PullRequest
0 голосов
/ 08 мая 2018

Я новичок в использовании Java RX и столкнулся с проблемой, надеюсь, кто-нибудь подскажет, что я делаю неправильно.

Вопрос: Есть много событий, которые я отслеживаю, такие события запускаются так:

Observable<Long> otherObservable = Observable.empty();

public void myMethod(){
Observable<Long> observable1 = Observable.timer(VARIABLE_TIME, TimeUnit.SECONDS);
        final Subscriber<Long> timeSubscriber = new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                // nothing really
            }

            @Override
            public void onError(final Throwable throwable) {
                // nothing really
            }

            @Override
            public void onNext(final Long number) {
                // Here i do something
            }
        };

        return Observable.merge(timerObservable, otherObservable) 
                .first()
                .subscribe(timeSubscriber);
}

Таким образом, в основном это вызывает событие после VARIABLE_TIME.

Это прекрасно работает, но теперь я сталкиваюсь с тем фактом, что у меня слишком много событий.

Так что я подумал об использовании debounce и buffer.

Я пытаюсь сделать следующее:

По-прежнему создается множество наблюдаемых, которые генерируют событие через N секунд.

Сбор информации от каждого из них (длинная или, может быть, строка)

По истечении времени задержки (времени буфера) Отправьте подписчику список со всей собранной информацией.

Пока я сделал это:

Observable<List<Long>> otherObservable = Observable.empty();

otherObservable.debounce(10L, SECONDS).buffer(20L, SECONDS);
Observable<List<Long>> observable1 = Observable.timer(VARIABLE_TIME, TimeUnit.SECONDS).buffer(1);

Subscriber<List<Long> > observerSuscriber = new Subscriber<List<Long>>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(final Throwable throwable) {

            }

            @Override
            public void onNext(final List<Long> ids ) {
             // do something here
            }
        };

Observable.merge(otherObservable, observable1)
                  .first()
                  .subscribe(observerSuscriber);

Но вот так я все равно получаю сообщение сразу после отправки.

Мне интересно, есть ли еще способ сделать это? Есть идеи? Я использую Java RX 1.2

1 Ответ

0 голосов
/ 09 мая 2018

По истечении времени задержки (времени буфера) Отправьте подписчику список со всей собранной информацией.

Вы ответили на свой вопрос там!Используйте оператор buffer:

Flowable<String> stream = ...
Flowable<List<String>> lists = stream.buffer(5, TimeUnit.SECONDS);
...