Подписаться на Observable ничего не делает - PullRequest
0 голосов
/ 02 июля 2019

Я пытаюсь понять, как выполняются Observables, но не могу заставить этот простой код работать.

public class RxJavaExample {
    public static void main(String[] args) {
        Observable<String> hello = Observable.fromCallable(() -> 
            getHello()).subscribeOn(Schedulers.newThread());

        hello.subscribe();

        System.out.println("End of main!");
    }

    public static String getHello() {
        System.out.println("Hello called in " + 
            Thread.currentThread().getName());
        return "Hello";
    }
}

Не должен hello.subscribe() выполнять getHello()?

Ответы [ 3 ]

2 голосов
/ 02 июля 2019

Это потому, что ваш основной поток завершает работу до того, как фоновый поток достигнет getHello.Попробуйте добавить Thread.sleep(5000) в ваш метод main перед выходом.

Либо подождите, пока не будет вызван onCompleted вашей подписки.

РЕДАКТИРОВАТЬ: причина завершения программыпотому что RxJava порождает daemon потоков.В поисках хорошего источника я также нашел этот вопрос, который, вероятно, также отвечает на него.

0 голосов
/ 02 июля 2019

Возможно, вы путаетесь между потоками и наблюдаемыми,

Я использовал Observables в прошлом для таймера на плагине Minecraft, у меня есть событие, которое запускается каждую минуту.

public class TimerHandler extends Observable implements Runnable{

    @Override
    public void run() {
        this.setChanged();
        this.notifyObservers();
    }
}

Таким образом, это срабатывает каждую минуту, а затем для добавления событий в очередь таймера, вы просто подписываетесь на наблюдаемое значение, означающее, что подписанные вызовы запускаются каждую минуту.

public class PlotTimer implements Observer {

    @Override
    public void update(Observable o, Object arg) {
        ......

чтобы подписаться я звоню на следующий

getServer().getScheduler().scheduleAsyncRepeatingTask(this,timerHandler,1200,1200);
timerHandler.addObserver(new PayDayTimer());
timerHandler.addObserver(new ProfileTimer());
timerHandler.addObserver(new PlotTimer());
0 голосов
/ 02 июля 2019

@ sfiss прав, это работает так, как вы ожидаете:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

public class RxJavaExample {
  public static void main(String[] args) throws InterruptedException {
    ExecutorService exec = Executors.newCachedThreadPool();
    Observable<String> hello = Observable.fromCallable(() -> getHello())
        .subscribeOn(Schedulers.from(exec));

    hello.subscribe();

    System.out.println("End of main!");

    exec.shutdown();
    exec.awaitTermination(10, TimeUnit.SECONDS);
  }

  public static String getHello() {
    System.out.println("Hello called in " + Thread.currentThread().getName());
    return "Hello";
  }
}

со следующим выводом:

End of main!
Hello called in pool-1-thread-1
...