Я почти уверен, что это связано с исполнением на AndroidSchedulers.mainThread()
, на котором вы observe
ваш Single
.Пожалуйста, убедитесь, что этот поток работает и не прерывается до того, как ваш код будет выполнен.
К сожалению, у меня в руках нет среды Android, и ваш код не завершен.Но в качестве демонстрации я переписал ваш код с помощью фиктивных методов для стандартной Java.Выглядит так:
package com.dpopov.rxjava.stackoverflow;
import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.SingleSource;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import java.util.Collections;
import java.util.List;
import java.util.Set;
public class ObservableNotExecuting {
public static void main(String[] args) throws InterruptedException {
getAccount("accName", "accPass");
Thread.sleep(5000);
}
private static void getAccount(String accountName, String password) {
System.out.println("ACCOUNT_" + "any message"); // this is executed
Single.fromCallable(() -> {
String account = accountName;
System.out.println("ACCOUNT_" + account); // not executed
return account;
})
.flatMap((Function<String, SingleSource<?>>) ObservableNotExecuting::findAccount)
.subscribeOn(Schedulers.io())
// .observeOn(Schedulers.mainThread())
// .observeOn(Schedulers.mainThread())
.subscribe(
accounts -> {
// accountObject -> {
// Account accountModel = accounts.get(0); // not executed
// System.out.println("ACCOUNT_" + accounts.name); // not executed
final Account account = ((List<Account>) accounts).get(0);
System.out.println("account: " + account.name);
},
throwable -> {
System.out.println("ACCOUNT_" + "BAD ERROR"); // not executed
}
);
}
private static SingleSource<List<Account>> findAccount(final String accountName) {
return new Single<List<Account>>() {
@Override
protected void subscribeActual(final SingleObserver<? super List<Account>> observer) {
final Account account = new Account(accountName);
final List<Account> accountsList = Collections.singletonList(account);
observer.onSuccess(accountsList);
}
};
}
static class Account {
public final String name;
public Account(final String name) {
this.name = name;
}
}
}
Вы можете попробовать выполнить это локально.Основное замечание таково: если вы удалите Thread.sleep
строку в #main
, на выход будет записана только первая строка, поскольку поток main
завершается сразу после возврата #getAccount
, до того, как начинает выполняться любая логика Observable
.
Поэтому посмотрите, как работает ваш поток пользовательского интерфейса Android при выполнении кода RxJava.