Синхронный весенний вызов webflux с сохранением порядка работы - PullRequest
0 голосов
/ 05 апреля 2019

У меня есть простой пример использования, и я не знаю, как это сделать, так как я новичок в Spring Webflux.

Я использую spring boot webflux starters.Мне нужно позвонить 2 конечных точки.Скажем, Endpoint1 и Endpoint2.

Всякий раз, когда на запрос Endpoint1 попадает запрос, я должен сначала нажать Endpoint2 с тем же запросом и использовать ответ от Endpoint2, чтобы обогатитьОригинальный запрос, а затем сделать что-то дальше.* Объект запроса Endpoint1 должен быть обогащен, используя ответ от Endpoint2, прежде чем что-либо делать.Как применить этот порядок с помощью Spring webflux?В моем случае исходный объект запроса не обогащается, прежде чем его можно будет использовать дальше.Любая помощь в этом очень ценится !!!

К вашему сведению - вызов Endpoint2 осуществляется с помощью webclient

Просто псевдокод:

public Mono<Response1> endpoint1(Request1 request1){

  Flux<Response2> reponse2 = webclient.getEndpoint2(request1); // Returns a Flux

  //use the above reponse2 to enrich the request1

  return webclient.getSomething(request1); //Returns Mono<Response1>

}

Фактический код:


 public Mono<ApplicationResponse> save(ApplicationRequest request) {

        return Mono.subscriberContext().flatMap(ctx -> {

            Mono blockingWrapper =  Mono.fromCallable(() ->
                    service.getId(request)
                            .subscriberContext(ctx)
                            .subscribe(id -> request.setId(id))
            ).subscribeOn(Schedulers.elastic());

            return blockingWrapper.flatMap(o -> authService.getAccessToken()
                    .flatMap(token -> post("/save", request,
                            token.getAccessToken(),
                            ctx)
                            .bodyToMono(ApplicationResponse.class))
                    .log());
        });
    }

Ответы [ 3 ]

2 голосов
/ 05 апреля 2019

Если вы уверены, что у вас будет Flux с getEndpoint2 (request1), в этом случае вы можете использовать collectList ():

return webclient.getEndpoint2(request1) // Flux<Response2>
         .collectList() // Mono<List<Response2>>
         .flatMap(list -> {
            // ... should handle empty list if needed
            finalRequest = createRequest(request1, list);
            return webclient.getSomething(finalRequest); // Mono<Response1>
         });
0 голосов
/ 07 апреля 2019

Ваша проблема исходит от второго .subscriberContext().Это статический метод, который создает новый Mono, что означает, что код до того, как он никогда не выполнится, поэтому объект request не изменяется.

В любом случае, ваш код грязный.Сделай это проще.Насколько я читаю твой код, тебе вообще не нужно Flux.feesService.calculateApplicationFees(...) должен вернуть Mono<List<FeeItem>>.Слишком много ненужных .log() или Mono.subscriberContext().Тебе вообще нужен контекст?

0 голосов
/ 06 апреля 2019

Я вижу, что происходит что-то интересное.Это работает, как и ожидалось, если я управляю этим из класса Controller, тогда как если я вызываю сервис из моего класса Controller, который управляет этим потоком, он, кажется, не работает должным образом.Просто интересно, что мне не хватает?или это как работать?

Это рабочий код:

@RestController
@RequestMapping("/applications")
@Slf4j
@RequiredArgsConstructor
public class ApplicationController {

    private final ApplicationService applicationService;
    private final ApplicationRequestMapper requestMapper;
    private final FeesService feesService;

    @PostMapping(value = "/save")
    public Mono<Application> saveApplication(@RequestBody ApplicationRequest request) {

        ApplicationRequest applicationRequest = requestMapper.apply(request);

        return Mono.subscriberContext()
                .flatMap(context -> feesService.calculateApplicationFees(applicationRequest)
                        .collectList())
                .map(feeItems -> applicationRequest.getFeeItems().addAll(feeItems))
                .flatMap(isRequestEnriched -> applicationService.saveApplication(applicationRequest)
                        .map(saveApplicationResponse -> {
                            Application application = new Application();
                            application.setLicenceId(saveApplicationResponse.getResponse().getLicenceNumber());
                            return application;
                        }))
                .onErrorMap(throwable -> new ApplicationException(String.format(SAVE_ERROR_MESSAGE,
                        request.getLicenceId()),
                        throwable, true, false))
                .log();
    }
}


@Service
@Slf4j
@RequiredArgsConstructor
public class ApplicationService extends ClientService{

     private final AuthenticationService authenticationService;  

         public Mono<SaveApplicationResponse> saveApplication(ApplicationRequest request) {

            return Mono.subscriberContext()
                .flatMap(context -> authenticationService.getAccessToken()
                        .flatMap(token -> post("/save",
                                request,
                                token.getAccessToken(),
                                context)
                                .bodyToMono(SaveApplicationResponse.class))
                        .log());
    }
}



@Service
@Slf4j
@RequiredArgsConstructor
public class FeesService extends ClientService{

     private final AuthenticationService authenticationService;  

        public Flux<FeeItem> calculateApplicationFees(ApplicationRequest request) {

        return Mono.subscriberContext()
                .flatMap(ctx -> authenticationService.getAccessToken()
                        .flatMap(token -> get("/fees", request, token.getAccessToken(), ctx)
                                .bodyToMono(FeeResponse.class))
                        .log())
                .flatMapMany(rsp -> Flux.fromIterable(rsp.getFeeItems()));
    }
}

Не работает, если я делаю это. То есть, запрос никогда не обогащается вообще:



@RestController
@RequestMapping("/applications")
@Slf4j
@RequiredArgsConstructor
public class ApplicationController {

    private final ApplicationService applicationService;
    private final ApplicationRequestMapper requestMapper;

     @PostMapping(value = "/save")
        public Mono<Application> saveApplication(@RequestBody ApplicationRequest request) {
            return Mono.subscriberContext()
                    .flatMap(context -> applicationService.saveApplication(requestMapper.apply(request))
                            .map(saveApplicationResponse -> {
                                Application application = new Application();
                                application.setLicenceId(saveApplicationResponse.getResponse().getLicenceNumber());
                                return application;
                            }))
                    .onErrorMap(throwable -> new ApplicationException(String.format(SAVE_ERROR_MESSAGE,
                            request.getLicenceId()),
                            throwable, true, false))
                    .log();
        }

}

@Service
@Slf4j
@RequiredArgsConstructor
public class ApplicationService extends ClientService{

     private final AuthenticationService authenticationService;
     private final FeesService feesService;


         public Mono<SaveApplicationResponse> saveApplication(ApplicationRequest request) {

            return Mono.subscriberContext()
                    .flatMap(context -> feesService.calculateApplicationFees(request)
                            .collectList())
                    .map(feeItems -> request.getFeeItems().addAll(feeItems))
                    .subscriberContext()
                    .flatMap(context -> authenticationService.getAccessToken()
                            .flatMap(token -> post("/save",
                                    request,
                                    token.getAccessToken(),
                                    context)
                                    .bodyToMono(SaveApplicationResponse.class))
                            .log());
        }
}



@Service
@Slf4j
@RequiredArgsConstructor
public class FeesService extends ClientService{

     private final AuthenticationService authenticationService;  

        public Flux<FeeItem> calculateApplicationFees(ApplicationRequest request) {

        return Mono.subscriberContext()
                .flatMap(ctx -> authenticationService.getAccessToken()
                        .flatMap(token -> get("/fees", request, token.getAccessToken(), ctx)
                                .bodyToMono(FeeResponse.class))
                        .log())
                .flatMapMany(rsp -> Flux.fromIterable(rsp.getFeeItems()));
    }
}

...