возвращение подписчика в RxJava после сохранения выборки данных из веб-сервиса - PullRequest
0 голосов
/ 26 июня 2018

Я пытаюсь вызвать веб-сервис для получения данных и сохранения их в базе данных, используя следующий код. Я создал отдельный класс для выполнения следующей операции.

Теперь проблема в том, что я хочу уведомить о своей активности, когда я успешно получу и сохраню данные в базе данных. если возникает какая-то ошибка, я хочу показать это на самом интерфейсе пользователя.

каким-то образом я могу написать код для извлечения данных с использованием нумерации страниц, но не уверен, как бы я уведомил пользовательский интерфейс, где я могу подписаться, чтобы получить обновление, связанное с прогрессом и ошибкой, если таковые имеются.

public Flowable<Response> getFitnessData() {

        Request request = new Request();
        request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");

        Flowable<Response> fitnessFlowable = new WebRequest()
                                            .getRemoteClient()
                                            .create(FitnessApi.class)
                                            .getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


         fitnessFlowable.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .takeUntil(response->response.getSummary().getNext()!=null)

                .subscribe(new Subscriber<Response>() {
                    @Override
                    public void onSubscribe(Subscription s) {

                        s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Response response) {

                        Log.e(TAG, "onNext" );

                        if(response !=null){

                            if(response.getFitness()!=null && response.getFitness().size()!=0){

                                Realm realm = Realm.getDefaultInstance();
                                realm.executeTransactionAsync(new Realm.Transaction() {
                                    @Override
                                    public void execute(Realm realm) {

                                        realm.copyToRealmOrUpdate(response.getFitness());

                                    }
                                }, new Realm.Transaction.OnSuccess() {
                                    @Override
                                    public void onSuccess() {

                                        Log.i(TAG, "onSuccess , fitness data saved");

                                    }
                                }, new Realm.Transaction.OnError() {
                                    @Override
                                    public void onError(Throwable error) {
                                        Log.i(TAG, "onError , fitness data failed to save"+error.getMessage() );
                                    }
                                });
                            }else{

                                Log.i(TAG, "onError , no fitness data available" );


                            }

                        }else{
                            Log.i(TAG, "onError , response is null" );

                        }
                    }

                    @Override
                    public void onError(Throwable t) {


                        Log.e(TAG, "onError" +t.getMessage());
                    }

                    @Override
                    public void onComplete() {

                        Log.e(TAG, "onComplete");
                    }
                });;

            return null;

    }

Обновлено

Создано RxBus для распространения событий и ошибок в пользовательском интерфейсе

public class RxBus {

    private static final RxBus INSTANCE = new RxBus();

    private RxBus(){}
    private PublishSubject<Object> bus = PublishSubject.create();

    public static RxBus getInstance() {
        return INSTANCE;
    }


    public void send(Object o) {
        bus.onNext(o);
    }

    public void error(Throwable e){bus.onError(e);}

    public Observable<Object> toObservable() {
        return bus;
    }
}

в деятельности

 FitnessRepo fitnessRepo = new FitnessRepo();
        fitnessRepo.getFitnessData();
        RxBus.getInstance().toObservable().subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object o) {

                if(o instanceof RealmList ){

                    RealmList<Fitness> realmList = (RealmList<Fitness>) o;
                    Log.e(TAG,"Fitness data size "+realmList.size());

                }
            }

            @Override
            public void onError(Throwable e) {

                Log.e(TAG,e.getMessage()+ "");

                if (e instanceof HttpException) {
                    ResponseBody body = ((HttpException) e).response().errorBody();


                    try {

                        Response response=  LoganSquare.parse(body.byteStream(),Response.class);

                        if(response.getErrors() !=null)
                            if(response.getErrors().size() > 0)
                                Log.e(TAG, "Error "+response.getErrors().get(0).getErrors());
                    } catch (IOException t) {
                        t.printStackTrace();
                    }

                }
            }

            @Override
            public void onComplete() {

            }
        });

в вызове веб-службы

public void getFitnessData() {


        Request request = new Request();
        request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");
        request.setEnd_date("2018-07-01T00:00:00");
        Flowable<Response> fitnessFlowable = new WebRequest()
                .getRemoteClient()
                .create(FitnessApi.class)
                .getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


         fitnessFlowable.subscribeOn(Schedulers.io())
                .takeUntil(response->response.getSummary().getNext()!=null)
                .doOnNext((response) -> {
                    if(response ==null || response.getFitness() == null || response.getFitness().isEmpty()) {


                        Log.e(TAG, " Error ");
                        return;
                    }

                    RxBus.getInstance().send(response.getFitness());

                    try(Realm r = Realm.getDefaultInstance()) {
                        r.executeTransaction((realm) -> {
                            realm.copyToRealmOrUpdate(response.getFitness());
                        });
                    }
                }).subscribe(item ->{


                 },
                 error ->{

                     RxBus.getInstance().error(error);


                 });
    }

Ответы [ 2 ]

0 голосов
/ 27 июня 2018

Есть разные способы достичь этого. Я никогда не буду обрабатывать подписки самостоятельно вне рамок жизненного цикла, поскольку это может привести к утечке памяти. В вашем случае кажется, что успех и неудача связаны с пользовательским интерфейсом, поэтому вы можете просто сделать это.

public Completable fetchFitnessData() {

Request request = new Request();
request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");

Flowable<Response> fitnessFlowable = new WebRequest()
                                    .getRemoteClient()
                                    .create(FitnessApi.class)
                                    .getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


 return fitnessFlowable.subscribeOn(Schedulers.io())
        .takeUntil(response->response.getSummary().getNext()!=null)
        .doOnNext((response) -> {
                if(response ==null || response.getFitness() == null || response.getFitness().isEmpty()) return;

                try(Realm r = Realm.getDefaultInstance()) {
                    r.executeTransaction((realm) -> {
                        realm.insertOrUpdate(response.getFitness());
                    });
                }
            }
        }).ignoreElements();

}

На уровне пользовательского интерфейса вы можете обрабатывать подписку как с успехом, так и с ошибкой. Если вам нужен успех, модель может заменить Completable на Single или Flowable.

fetchFitnessData.subscrible(Functions.EMPTY_ACTION, Timber::d);

Основным преимуществом этого подхода является то, что вы обрабатываете свои жизненные циклы подписки.

0 голосов
/ 27 июня 2018

У меня для вас хорошие новости! Вы можете удалить почти весь этот код и в результате просто улучшить его!

public void fetchFitnessData() {

    Request request = new Request();
    request.setAccess_token("d80fa6bd6f78cc704104d61146c599bc94b82ca225349ee68762fc6c70d2dcf0");

    Flowable<Response> fitnessFlowable = new WebRequest()
                                        .getRemoteClient()
                                        .create(FitnessApi.class)
                                        .getFitnessData("5b238abb4d3590001d9b94a8",request.toMap());


     fitnessFlowable.subscribeOn(Schedulers.io())
            .takeUntil(response->response.getSummary().getNext()!=null)
            .doOnNext((response) -> {
                    if(response ==null || response.getFitness() == null || response.getFitness().isEmpty()) return;

                    try(Realm r = Realm.getDefaultInstance()) {
                        r.executeTransaction((realm) -> {
                            realm.insertOrUpdate(response.getFitness());
                        });
                    }
                }
            }).subscribe();
}

Этот метод теперь работает в фоновом потоке и возвращает void, поэтому для вывода чего-либо из этого метода можно использовать PublishSubject (один для успеха, другой для отказа) или EventBus .

private PublishSubject<Object> fitnessResults;
public Observable<Object> observeFitnessResults() {
    return fitnessResults;
}

public static class Success {
    public Success(List<Fitness> data) {
        this.data = data;
    }

    public List<Fitness> data;
}

public static class Failure {
    public Failure(Exception exception) {
        this.exception = exception;
    }

    public Exception exception;
}

public void fetchFitnessData() {
    ...
        fitnessResults.onNext(new Success(data));
    } catch(Exception e) {
        fitnessResults.onNext(new Failure(e));

А потом

errors = observeFitnessResults().ofType(Error.class);
success = observeFitnessResults().ofType(Success.class);
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...