Я новичок в использовании Java RX и столкнулся с проблемой, надеюсь, кто-нибудь подскажет, что я делаю неправильно.
Вопрос:
Есть много событий, которые я отслеживаю, такие события запускаются так:
Observable<Long> otherObservable = Observable.empty();
public void myMethod(){
Observable<Long> observable1 = Observable.timer(VARIABLE_TIME, TimeUnit.SECONDS);
final Subscriber<Long> timeSubscriber = new Subscriber<Long>() {
@Override
public void onCompleted() {
// nothing really
}
@Override
public void onError(final Throwable throwable) {
// nothing really
}
@Override
public void onNext(final Long number) {
// Here i do something
}
};
return Observable.merge(timerObservable, otherObservable)
.first()
.subscribe(timeSubscriber);
}
Таким образом, в основном это вызывает событие после VARIABLE_TIME.
Это прекрасно работает, но теперь я сталкиваюсь с тем фактом, что у меня слишком много событий.
Так что я подумал об использовании debounce и buffer.
Я пытаюсь сделать следующее:
По-прежнему создается множество наблюдаемых, которые генерируют событие через N секунд.
Сбор информации от каждого из них (длинная или, может быть, строка)
По истечении времени задержки (времени буфера) Отправьте подписчику список со всей собранной информацией.
Пока я сделал это:
Observable<List<Long>> otherObservable = Observable.empty();
otherObservable.debounce(10L, SECONDS).buffer(20L, SECONDS);
Observable<List<Long>> observable1 = Observable.timer(VARIABLE_TIME, TimeUnit.SECONDS).buffer(1);
Subscriber<List<Long> > observerSuscriber = new Subscriber<List<Long>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(final Throwable throwable) {
}
@Override
public void onNext(final List<Long> ids ) {
// do something here
}
};
Observable.merge(otherObservable, observable1)
.first()
.subscribe(observerSuscriber);
Но вот так я все равно получаю сообщение сразу после отправки.
Мне интересно, есть ли еще способ сделать это? Есть идеи? Я использую Java RX 1.2