Как использовать реактивный поток веб-клиента в заблокированном методе? - PullRequest
0 голосов
/ 07 марта 2020

Я ищу как справку по правильному способу написания этого, так и некоторые рекомендации для более глубокого понимания того, как работают блокирующие потоки. Я знаю, что я пытаюсь сделать это неправильно, но я не могу понять, почему, именно поэтому я не могу это исправить.

У меня есть поток в некоторых Java код, который сначала выполняет интеграционный вызов внешней службы, а вызов выполняет базу данных, каждый из которых возвращает объекты Mono.

Mono<UserProfileResource> userProfileResourceMono = integrationService.retrieveUserProfileByUserId(userId);
Mono<ScheduleDoc> scheduleDocMono = scheduleReadRepository.findDocumentByIdMono(docId);

Ни один из этих потоков не зависит друг от друга, но они оба необходимы для последующий поток, поэтому я могу сжать их вместе и вызвать их асинхронно. Эта проблема, которую я имею, находится в части функции zip lambda следующего кода. Здесь происходит больше, но я уверен, что эти фрагменты не актуальны, поэтому я для краткости отредактировал свой код. Используя информацию из этого ресурса и документа, я создаю полезную нагрузку для POST для другой внешней системы, что я делаю, используя реактивный веб-клиент. Я могу получить ответ как Mono из этого потока, но я не могу заблокировать его, чтобы использовать его для дальнейших манипуляций перед сохранением чего-либо в моей базе данных.

Mono.zip(userProfileResourceMono , scheduleDocMono ).flatMap(objects -> {
        List<UserProfileResource> userProfileResource = objects.getT1();
        ScheduleDoc scheduleDoc = objects.getT2();

        PostResource postResource = new PostResource(userProfileResource, scheduleDoc);
        String uri = "myExternalService";
        Mono<String> bodyResponseMono = webClient.post().uri(uri)
            .body(BodyInserters.fromObject(postResource))
            .retrieve()
            .onStatus(HttpStatus::isError, response -> logWebClientError(uri, response))
            .bodyToMono(String.class);
        String bodyResponse = bodyResponseMono.block();

        //Do stuff to scheduleDoc 
        database.save(scheduleDoc)
        }).block();

Итак, проблема в том, когда происходит блокировка bodyResponseMono, которую я получаю из моего POST. Я получаю следующее исключение:

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4

Из того, что я читал, я не могу заблокировать в заблокированном потоке, потому что поток уже заблокирован. Я хотел бы понять это лучше, но интуитивно это имеет смысл. Но как я могу использовать реактивный поток внутри этого Mono?

У меня есть пара теорий альтернатив, которые я могу здесь сделать, но мне любопытно, что было бы оптимальным.

Вариант 1) : создать мой PostResource в лямбда-потоке Mono.zip, а затем вернуть его из этого потока и продолжить. Проблема с представленной ниже идеей - очевидная явная дыра, которая необходима для доступа к документу, чтобы сохранить его, но он доступен только в заблокированном методе asyn c. Я только хочу обновить базу данных, если POST успешен, поэтому я не могу сделать это в первую очередь. Я предполагал, что смогу создать объект для возврата postBody И расписания Do c, но это становится слишком тяжелым для того, что я пытаюсь выполнить sh.

Mono.zip(userProfileResourceMono , scheduleDocMono ).flatMap(objects -> {
        List<UserProfileResource> userProfileResource = objects.getT1();
        ScheduleDoc scheduleDoc = objects.getT2();

        PostResource postResource = new PostResource(userProfileResource, scheduleDoc);
        }).block();

    String uri = "myExternalService";
    Mono<String> bodyResponseMono = webClient.post().uri(uri)
        .body(BodyInserters.fromObject(postResource))
        .retrieve()
        .onStatus(HttpStatus::isError, response -> logWebClientError(uri, response))
        .bodyToMono(String.class);
    String bodyResponse = bodyResponseMono.block();

    //Do stuff to scheduleDoc 
    database.save(scheduleDoc)

Вариант 2 ) : То же, что и выше, но вместо postResource вернуть bodyMonoResponse и проверить его, прежде чем вносить дальнейшие изменения и сохранять в базу данных. Кажется, это имеет больше смысла, чем вариант 1, но все еще имеет ту же проблему, где у меня нет документа.

Вариант 3) : использовать restTemplate или какой-либо вид неблокирования рамки, чтобы сделать мой запрос POST к службе интеграции. В идеале я не хочу писать метод restTemplate только для этого конкретного c сценария. Я думаю, что должен быть какой-то способ сделать это с веб-клиентом, который я еще не смог найти. Это идеальный сценарий.

...