Основной поток не ждет, пока подписчики завершат свою задачу в реактивном подписчике - PullRequest
1 голос
/ 09 января 2020

У меня есть служба весной, которая должна извлекать данные, используя десять различных методов.

Я хотел бы, чтобы эти методы выполнялись параллельно для выполнения некоторых операций с БД и возврата в родительский поток. Но родительский поток должен подождать, пока все ответы придут, и затем вернуть ответ.

В моем текущем подходе я использую реактивный моно для выполнения всех методов асинхронно, но основной поток не ожидает методов подписчика для fini sh.

Ниже приведены мои два метода, на которые я подписался

private Mono<BaseResponse> getProfileDetails(long profileId){
        return new Mono<BaseResponse>() {

            @Override
            public void subscribe(Subscriber<? super BaseResponse> s) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // DB Operation
                System.out.println("Inside getProfileDetails");
                s.onNext(new BaseResponse());
            }
        };
    }

private Mono<Address> getAddressDetails(long profileId){
        return new Mono<Address>() {

            @Override
            public void subscribe(Subscriber<? super Address> s) {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // DB Operation
                System.out.println("Inside getAddressDetails");
                s.onNext(new Address());
            }
        };
    }

И ниже мой основной метод

public BaseResponse getDetails(long profileId){
        ExecutorService executors = Executors.newFixedThreadPool(2);

        Mono<BaseResponse> profileDetail = this.getProfileDetails(profileId).subscribeOn(Schedulers.fromExecutor(executors));
        Mono<BaseResponse> addressDetail = this.getAddressDetails(profileId).subscribeOn(Schedulers.fromExecutor(executors));

        List<BaseResponse> list = new ArrayList<>();

        profileDetail.mergeWith(addressDetail)
        .subscribe(consumer -> {
                list.add(consumer);
        });

        System.out.println("list: "+new Gson().toJson(list));
        executors.shutdown();

        return response;
    }

Ниже мой вывод:

list: []
Inside getProfileDetails
Inside getAddressDetails

Мой вывод показывает, что основной поток не ждет, пока подписчик завершит свою sh свою задачу, так как я могу справиться с этой ситуацией?

1 Ответ

2 голосов
/ 09 января 2020

Я предполагаю, что ваши getProfileDetails() и getAddressDetails() методы являются просто заполнителями, поскольку они не имеют особого смысла, как написано.

При этом, если это все ваше приложение здесь, и Вы искренне просто хотите заблокировать перед завершением, вы также можете просто изменить текущий вызов subscribe() на doOnNext(), тогда просто blockLast():

profileDetail.mergeWith(addressDetail)
.doOnNext(consumer -> {
        list.add(consumer);
})
.blockLast();

Блокировка на реактивных потоках обычно плохо советуют в реактивных приложениях по уважительной причине, но в этом случае вы буквально просто хотите заблокировать перед тем, как выйти, так что я не вижу здесь большого недостатка.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...