Использование Spring 5.0 WebClient для выполнения асинхронных вызовов - PullRequest
0 голосов
/ 18 января 2019

Я пытаюсь делать асинхронные почтовые запросы через WebClient, так как это предпочтительный способ при использовании Springframework 5.0.

вот мой код сервера:

package com.example.demo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
@Slf4j
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @PostMapping(value = "/createCustomer", produces = MediaType.APPLICATION_JSON_UTF8_VALUE, consumes = "text/plain")
    public Customer sendStream(@RequestBody String number) throws InterruptedException {
        Thread.sleep(40);
        return new Customer(number, "random" + number);
    }

    @Data
    @AllArgsConstructor
    class Customer {
        String id;
        String name;
    }


}

Код клиента, который у меня есть:

    package com.example.demo2;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.util.stream.IntStream;

@SpringBootApplication
@Slf4j
public class Demo2Application implements CommandLineRunner {


    public static void main(String[] args) {
        SpringApplication.run(Demo2Application.class, args);
    }


    @Override
    public void run(String... args) throws Exception {
        WebClient webClient = WebClient.create("http://localhost:8080");
        IntStream.range(0, 100000).forEach(i -> {
            Flux.merge(
                    webClient
                            .post()
                            .uri("/createCustomer")
                            .body(BodyInserters.fromObject(String.valueOf(i)))
                            .retrieve().bodyToMono(String.class)
            )
                    .parallel()
                    .runOn(Schedulers.single())
                    .subscribe(log::info);


        });

    }
}

И я получаю следующие исключения, когда запускаю этот код с некоторыми успешными запусками.

2019-01-18 00:39:34.716  INFO 38236 --- [       single-1] com.example.demo2.Demo2Application       : {"id":"1049","name":"random1049"} 2019-01-18 00:39:34.731 ERROR 38236
--- [       single-1] reactor.core.scheduler.Schedulers        : Scheduler worker in group main failed with an uncaught exception

reactor.core.Exceptions$ErrorCallbackNotImplemented: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:8080 Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: no further information: localhost/127.0.0.1:8080   at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:na]  at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779) ~[na:na]    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]     at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:636) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]     at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:583) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]   at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:500) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462) ~[netty-transport-4.1.31.Final.jar:4.1.31.Final]    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897) ~[netty-common-4.1.31.Final.jar:4.1.31.Final]   at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]     Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:  Assembly trace from producer [reactor.core.publisher.MonoCreate] :  reactor.core.publisher.Mono.create(Mono.java:183)   reactor.netty.resources.PooledConnectionProvider.acquire(PooledConnectionProvider.java:121) Error has been observed by the following operator(s):   |_  Mono.create ⇢ reactor.netty.resources.PooledConnectionProvider.acquire(PooledConnectionProvider.java:121)   |_  Mono.create ⇢ reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:289)     |_  Mono.retry ⇢ reactor.netty.http.client.HttpClientConnect$MonoHttpConnect.subscribe(HttpClientConnect.java:324)  |_  Mono.flatMapMany ⇢ reactor.netty.http.client.HttpClientFinalizer.responseConnection(HttpClientFinalizer.java:85)    |_  Flux.next ⇢ org.springframework.http.client.reactive.ReactorClientHttpConnector.connect(ReactorClientHttpConnector.java:112)    |_  Mono.doOnRequest ⇢ org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:104)  |_  Mono.doOnCancel ⇢ org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:105)   |_  Mono.map ⇢ org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.exchange(ExchangeFunctions.java:106)  |_  Mono.switchIfEmpty ⇢ org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.exchange(DefaultWebClient.java:319)    |_  Mono.flatMap ⇢ org.springframework.web.reactive.function.client.DefaultWebClient$DefaultResponseSpec.bodyToMono(DefaultWebClient.java:429)  |_  Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35)       |_  Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35)           |_  Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35)
                |_  Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35)
                    |_  Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35)
                        |_  Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35)
                            |_  Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35)
                                |_  Flux.parallel ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:35)
                                |_  ParallelFlux.runOn ⇢ com.example.demo2.Demo2Application.lambda$run$0(Demo2Application.java:36)

Caused by: java.net.ConnectException: Connection refused: no further information    ... 10 common frames omitted

Что я делаю неправильно, я пытался ограничить количество потоков, но это не помогло, как видно из кода клиента, где я использую один поток.

Любая помощь будет высоко ценится.

это ошибка?

...