Когда вы закрываете подписчика, чей эмиссор является Наблюдаемым с отскоком? - PullRequest
0 голосов
/ 12 мая 2018

Я новичок в Java RX, и мне было интересно, если это возможно.

У меня есть данные, поступающие с контроллера.

И он делает что-то вроде этого:

observable.asObservable().debounce(10, SECONDS).subscribe(mySubscriber);

final Subscriber<Long> mySubscriber = new Subscriber<Long>() {

        @Override
        public void onCompleted() {
            System.out.println("killing the subscriber");
        }

        @Override
        public void onError(final Throwable throwable) {
        }

        @Override
        public void onNext(final Long number) {
            //some stuff
        }
    };

Отлично работает, debounce и onNext, но когда я перестаю отправлять события, я ждал, что последний debounce (без информации) вызовет onCompleted (), но это не так

Мне нужно установить тайм-аут после запуска последнего отката, учитывая, что событие может быть опубликовано после N времени.

Например.

Если у меня откат 10 секунд и тайм-аут 20.

Я получаю первое событие после второго 5.

Деблокирование запускается и будет ждать еще 10 секунд.

Я получаю второе событие на втором 14.

Деблокирование запущено, и я подожду до секунды 24.

Тайм-аут срабатывает в секунду 20 и убивает абонента.

В секунду 24 я должен получить событие у подписчика, но он уже был убит.

Я хочу установить тайм-аут ПОСЛЕ последнего испущенного, учитывая дебад.

1 Ответ

0 голосов
/ 17 мая 2018

Не уверен, что то, что я предлагаю, является наиболее элегантным способом, но вы можете просто записать последний полученный объект, а затем переиздать его по тайм-ауту, как это

AtomicReference<Long> lastObject = new AtomicReference<>(null);

Observable
    .interval(100, TimeUnit.MILLISECONDS)
    .doOnNext(i -> lastObject.set(i))
    .debounce(150, TimeUnit.MILLISECONDS)
    .timeout(1000, TimeUnit.MILLISECONDS, Observable.create(emitter -> {
        if (lastObject.get() != null) emitter.onNext(lastObject.get());
        emitter.onComplete();
    }))
    .subscribe(...);

В этом примере мы испускаем каждые 100 миллисекунд, и поскольку debounce выше, никакие элементы не испускаются. Поэтому, когда timeout происходит, мы переиздаем последний элемент, который мы запомнили в doOnNext до debounce.

...