Подписка наблюдаемой внутри другой наблюдаемой - PullRequest
0 голосов
/ 03 июля 2018

Я отлаживал свой код, который выбирает UserWallet s из базы данных, а затем генерирует адреса для них, подключаясь к внешнему REST API. Теперь у меня есть подписка, вложенная в другую подписку, но я прочитал, что это плохое решение (на самом деле это не работает, и я думаю, что причина).

userWalletDao.getUnregisteredUserWallets()
                .subscribe(nextWallet -> {
                    log.info("Fetched next wallet for registration {}", nextWallet);
                    blockchainIntegration.registerUserWallet(nextWallet.getUserId())
                            .subscribe(address -> {
                                nextWallet.setAddress(address);
                                userWalletDao.persistUserWalletAddress(nextWallet);
                                log.info("Registered wallet {} with address {}.", nextWallet, address);
                            });
                });

Я пытался сделать это за одну подписку, но если я отправляю бумажники по адресам, я теряю объект UserWallet, чтобы установить для него извлеченный адрес и сохранить его обратно в базе данных.

Как я могу получить кошельки и затем вызвать API, чтобы сгенерировать для него адрес с одной подпиской?

getUnregisteredUserWallets() возвращает Observable<UserWallet> и registerUserWallet() возвращает Single<String>.

1 Ответ

0 голосов
/ 03 июля 2018

Настоятельно рекомендуется прочитать и понять о зависимых подпотоках, упомянутых в первом комментарии.

Вы можете решить свою проблему, изменив наблюдаемую последовательность на что-то вроде этого

       userWalletDao.getUnregisteredUserWallets()
                .flatMap(nextWallet -> registerUserWallet(nextWallet.getUserId()).toObservable()
                        .flatMap(address -> Observable.fromCallable(() -> new Pair<>(nextWallet, address))))  // return both wallet from previous mapping and address from current mapping to the next level
                .flatMapCompletable(walletAddressPair -> Completable.fromAction(()->{
                    Wallet nextWallet = walletAddressPair.first;
                    String address = walletAddressPair.second;
                    nextWallet.setAddress(address);
                    userWalletDao.persistUserWalletAddress(nextWallet);
                    log.info("Registered wallet {} with address {}.", nextWallet, address);
                    // here wallet and address have been saved to db. This operation is a completable action, you don't have to return any result 
                    // from it and forward to the next level.  Thats why flatMapCompletable is used.
                }))
                .subscribeWith(new DisposableCompletableObserver() {
                    @Override
                    public void onComplete() {
                       // All actions completed
                    }

                    @Override
                    public void onError(Throwable e) {
                      // any error occurred in the observable chain
                    }
                });
...