Запуск вечного фонового процесса с использованием RX Java - PullRequest
0 голосов
/ 28 июня 2018

Я пытаюсь создать приложение с использованием RX Java, которое должно прослушивать брокер сообщений. Этот слушатель всегда должен быть включен и не должен блокировать основной поток.

Я использую следующий код для создания наблюдаемой, которая испускает сообщения, как только получает их.

public Observable<Message<byte[]>>  getObservable(){ 
return Observable.create((ObservableOnSubscribe<Message<byte[]>>) subscriber -> {
        if (!subscriber.isDisposed()) { 
                Scheduler.Worker deliveryWorker = **Schedulers.io().createWorker()**;  
                deliveryWorker.schedule(() -> {
                Message<byte[]>  receivedMessages = destination.receive();
              subscriber.onNext(receivedMessages );
                  });

        }

    })   
 .**observeOn(Schedulers.newThread())**;
} }

Теперь, когда я выполняю код, я вижу, что программа работает секунду, а затем завершается.

Я ожидаю, что это будет выполняться вечно, так как subscriber.onComplete () никогда не вызывается. Я что-то упускаю, что нужно сделать, чтобы этот работник работал вечно и получал Сообщения ?

Что мне здесь не хватает? Что нужно сделать, чтобы наблюдаемое начало работать вечно?

Я провел свой тест в Eclipse и вижу, что после выполнения метода main ни один из потоков не жив;

Однако, если я изменю наблюдаемое на наблюдаемое блокирование, используя следующий код, процесс не завершается и выполняется вечно.

 getObservable().blockingSubscribe(message->{print(message);})
 doSomething(); // this code is not executing 

Но в то же время это блокирует выполнение основного потока. Это не то, что я хочу.

Каким может быть решение моей проблемы?

...