RxJava собирает событие AMPQ в Observable и подписывается буфером - PullRequest
0 голосов
/ 17 мая 2018

Мне нужно собрать некоторые события AMPQ и затем печатать их каждые 10 секунд, используя буфер.

private Observable<Event> obs = Observable.empty(); 
private final Disposable disposable = obs.buffer(10, SECONDS)
                              .retry(t -> true)
                              .subscribe(System.out::println);

@Override
public void handle(final Event event, final MessageContext context) throws MessageConsumptionException {
      obs = obs.concatWith(Observable.just(event));
}

Событие - это сообщение, а void handle - это потребитель.

Я отлаживаю этот код, и он печатает только пустой список, и это имеет смысл, потому что obs пуст.

Как я могу добавить (concat?) События в этот Observable и непрерывно выполнять одноразовый ? Спасибо.

1 Ответ

0 голосов
/ 17 мая 2018

Вам нужен Тема , на которую вы можете подписаться.Новые элементы могут быть добавлены в тему с помощью next(T element)

private Subject<Event> subject = ReplaySubject.create();

@Override
public void handle(final Event event, final MessageContext context) throws MessageConsumptionException {
     subject.next(event);
}

public Observable<Event> getObservable() {
    subject.asObservable();
}

. Вы можете подписаться на наблюдаемое, которое возвращается методом getObservable().

...