RxJava от вызова и отписки - PullRequest
0 голосов
/ 27 августа 2018

У меня есть следующий код (RxJava 1.3.8):

Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());

Subscription subscription1 = Completable.fromCallable(() -> {
  Thread.sleep(1000);
  System.out.println("first Callable executed");
  return 0;
})
    .subscribeOn(scheduler)
    .subscribe();

Subscription subscription2 = Completable.fromCallable(() -> {
  Thread.sleep(1000);
  System.out.println("second Callable executed");
  return 0;
})
    .subscribeOn(scheduler)
    .subscribe();

CompositeSubscription subscriptions = new CompositeSubscription();
subscriptions.addAll(subscription1, subscription2);
subscriptions.clear();

Вывод:

первый Выполненный вызов

второй Выполненный вызов

Вопрос - почему был казнен второй Callable?Перед запуском я ожидаю, что подписка проверена и выполнение отменено, если подписка отменена.

1 Ответ

0 голосов
/ 24 декабря 2018

Подписка проверяется только до того, как она выдаст значение, но может показаться, что она вызовет вызываемый объект независимо от этого.

public static Completable fromCallable(final Callable<?> callable) {
    requireNonNull(callable);
    return create(new OnSubscribe() {
        @Override
        public void call(rx.CompletableSubscriber s) {
            BooleanSubscription bs = new BooleanSubscription();
            s.onSubscribe(bs);
            try {
                // calls the callable regardless
                callable.call();
            } catch (Throwable e) {
                if (!bs.isUnsubscribed()) {
                    s.onError(e);
                }
                return;
            }
            // the emission of the complete is guarded
            if (!bs.isUnsubscribed()) {
                s.onCompleted();
            }
        }
    });
}

Так что если мы изменим ваш код следующим образом, добавив doOnComplete ккаждый.

Scheduler scheduler = Schedulers.from(Executors.newSingleThreadExecutor());

Subscription subscription1 = Completable.fromCallable(() -> {
  Thread.sleep(1000);
  System.out.println("first Callable executed");
  return 0;
})
    .doOnComplete(() -> System.out.println("first Completable complete"))
    .subscribeOn(scheduler)
    .subscribe();

Subscription subscription2 = Completable.fromCallable(() -> {
  Thread.sleep(1000);
  System.out.println("second Callable executed");
  return 0;
})
    .doOnComplete(() -> System.out.println("second Completable complete"))
    .subscribeOn(scheduler)
    .subscribe();

CompositeSubscription subscriptions = new CompositeSubscription();
subscriptions.addAll(subscription1, subscription2);
subscriptions.clear();

Мы получаем:

first Callable executed
second Callable executed

Если мы не очистим подписки, мы увидим дополнительный вывод, то есть

first Callable executed
first Completable complete
second Callable executed
second Completable complete
...