Я пытаюсь создать приложение с использованием RX Java, которое должно прослушивать брокер сообщений. Этот слушатель всегда должен быть включен и не должен блокировать основной поток.
Я использую следующий код для создания наблюдаемой, которая испускает сообщения, как только получает их.
public Observable<Message<byte[]>> getObservable(){
return Observable.create((ObservableOnSubscribe<Message<byte[]>>) subscriber -> {
if (!subscriber.isDisposed()) {
Scheduler.Worker deliveryWorker = **Schedulers.io().createWorker()**;
deliveryWorker.schedule(() -> {
Message<byte[]> receivedMessages = destination.receive();
subscriber.onNext(receivedMessages );
});
}
})
.**observeOn(Schedulers.newThread())**;
} }
Теперь, когда я выполняю код, я вижу, что программа работает секунду, а затем завершается.
Я ожидаю, что это будет выполняться вечно, так как subscriber.onComplete () никогда не вызывается. Я что-то упускаю, что нужно сделать, чтобы этот работник работал вечно и получал Сообщения ?
Что мне здесь не хватает? Что нужно сделать, чтобы наблюдаемое начало работать вечно?
Я провел свой тест в Eclipse и вижу, что после выполнения метода main ни один из потоков не жив;
Однако, если я изменю наблюдаемое на наблюдаемое блокирование, используя следующий код, процесс не завершается и выполняется вечно.
getObservable().blockingSubscribe(message->{print(message);})
doSomething(); // this code is not executing
Но в то же время это блокирует выполнение основного потока. Это не то, что я хочу.
Каким может быть решение моей проблемы?