Как получить значение из Single Observable в rxjava? - PullRequest
0 голосов
/ 14 июня 2019
private void getAccount(String accountName,String password) {
    Log.i("ACCOUNT_", "any message"); //this is executed
    Single.fromCallable(() -> {
        String account = accountName;
        Log.i("ACCOUNT_", account); //not executed
        return account;
    }).flatMap((accountName) ->{
        return accountRepository.findAccount(accountName);
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe((List<Account> accounts) -> {
                Account accountModel = accounts.get(0);//not executed
                Log.i("ACCOUNT_", accountModel.getName());//not executed
            },throwable -> {
               Log.i("ACCOUNT_", "BAD EROR");//not executed
      });

}

Я обновил код, комментируя, какие части не выполняются, когда я вызываю getAcount() метод. В чем может быть причина?

1 Ответ

0 голосов
/ 15 июня 2019

Я почти уверен, что это связано с исполнением на AndroidSchedulers.mainThread(), на котором вы observe ваш Single.Пожалуйста, убедитесь, что этот поток работает и не прерывается до того, как ваш код будет выполнен.

К сожалению, у меня в руках нет среды Android, и ваш код не завершен.Но в качестве демонстрации я переписал ваш код с помощью фиктивных методов для стандартной Java.Выглядит так:

package com.dpopov.rxjava.stackoverflow;

import io.reactivex.Single;
import io.reactivex.SingleObserver;
import io.reactivex.SingleSource;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;

import java.util.Collections;
import java.util.List;
import java.util.Set;

public class ObservableNotExecuting {

    public static void main(String[] args) throws InterruptedException {
        getAccount("accName", "accPass");

        Thread.sleep(5000);
    }

    private static void getAccount(String accountName, String password) {
        System.out.println("ACCOUNT_" + "any message"); // this is executed

        Single.fromCallable(() -> {
            String account = accountName;
            System.out.println("ACCOUNT_" + account); // not executed
            return account;
        })
                .flatMap((Function<String, SingleSource<?>>) ObservableNotExecuting::findAccount)
                .subscribeOn(Schedulers.io())
//                .observeOn(Schedulers.mainThread())
//                .observeOn(Schedulers.mainThread())
                .subscribe(
                        accounts -> {
//                        accountObject -> {
//                            Account accountModel = accounts.get(0); // not executed
//                            System.out.println("ACCOUNT_" + accounts.name); // not executed
                            final Account account = ((List<Account>) accounts).get(0);
                            System.out.println("account: " + account.name);
                        },
                        throwable -> {
                            System.out.println("ACCOUNT_" + "BAD ERROR"); // not executed
                        }
                );
    }

    private static SingleSource<List<Account>> findAccount(final String accountName) {
        return new Single<List<Account>>() {
            @Override
            protected void subscribeActual(final SingleObserver<? super List<Account>> observer) {
                final Account account = new Account(accountName);
                final List<Account> accountsList = Collections.singletonList(account);
                observer.onSuccess(accountsList);
            }
        };
    }

    static class Account {
        public final String name;

        public Account(final String name) {
            this.name = name;
        }
    }
}

Вы можете попробовать выполнить это локально.Основное замечание таково: если вы удалите Thread.sleep строку в #main, на выход будет записана только первая строка, поскольку поток main завершается сразу после возврата #getAccount, до того, как начинает выполняться любая логика Observable.

Поэтому посмотрите, как работает ваш поток пользовательского интерфейса Android при выполнении кода RxJava.

...