Я создал новый образец и вставил коды на стороне клиента и сервера.
Полные коды можно найти здесь .
Существует 3 версии сервера side.
- сервер Нет Приложение Spring Boot, использующее Spring Integration RSocket InboundGateway.
- загрузка с сервера Повторное использование автоконфигурации Spring RSocket и создал
ServerRSocketConnecter
- ServerRSocketMessageHanlder
. - server-boot-messsagemapping Не использовать Spring Integration, просто используйте автоконфигурацию Spring Boot RSocket, а также
@Controller
и @MessageMapping
.
Существует 2 версии клиента.
- клиент , отправка сообщений с использованием Spring Integration Rocket OutboundGateway.
- клиент-запросчик Отправлять сообщения с использованием
RSocketRequester
, вообще не использовать Spring Integration.
Режим взаимодействия клиента и сервера - REQUEST_CHANNEL, а соединение с сервером по TCP / localhost: 7000.
сервер
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
Класс приложения:
@Configuration
@ComponentScan
@IntegrationComponentScan
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) throws IOException {
try (ConfigurableApplicationContext ctx = new AnnotationConfigApplicationContext(DemoApplication.class)) {
System.out.println("Press any key to exit.");
System.in.read();
} finally {
System.out.println("Exited.");
}
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
return new ServerRSocketConnector("localhost", 7000);
}
@Bean
public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel)
.rsocketConnector(serverRSocketConnector)
)
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}
}
загрузка сервера
Зависимости в пом. xml.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
application.properties
spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp
Класс приложения.
@SpringBootApplication
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) throws IOException {
SpringApplication.run(DemoApplication.class, args);
}
// see PR: https://github.com/spring-projects/spring-boot/pull/18834
@Bean
ServerRSocketMessageHandler serverRSocketMessageHandler(RSocketStrategies rSocketStrategies) {
var handler = new ServerRSocketMessageHandler(true);
handler.setRSocketStrategies(rSocketStrategies);
return handler;
}
@Bean
public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler serverRSocketMessageHandler) {
return new ServerRSocketConnector(serverRSocketMessageHandler);
}
@Bean
public IntegrationFlow rsocketUpperCaseFlow(ServerRSocketConnector serverRSocketConnector) {
return IntegrationFlows
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel)
.rsocketConnector(serverRSocketConnector)
)
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toUpperCase))
.get();
}
}
server-boot-messagemapping
Зависимости в пом. xml.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
Приложение.properties.
spring.rsocket.server.port=7000
spring.rsocket.server.transport=tcp
Класс applcition.
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@Controller
class UpperCaseHandler {
@MessageMapping("/uppercase")
public Flux<String> uppercase(Flux<String> input) {
return input.map(String::toUpperCase);
}
}
client
В клиенте зависимости в pom. xml похожи на
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
</dependency>
Класс приложения:
@SpringBootApplication
@EnableIntegration
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector("localhost", 7000);
clientRSocketConnector.setAutoStartup(false);
return clientRSocketConnector;
}
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlows
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel((message) -> RSocketInteractionModel.requestChannel)
.expectedResponseType("T(java.lang.String)")
.clientRSocketConnector(clientRSocketConnector))
.get();
}
}
@RestController
class HelloController {
@Autowired()
@Lazy
@Qualifier("rsocketUpperCaseRequestFlow.gateway")
private Function<Flux<String>, Flux<String>> rsocketUpperCaseFlowFunction;
@GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> uppercase() {
return rsocketUpperCaseFlowFunction.apply(Flux.just("a", "b", "c", "d"));
}
}
При запуске клиентского и серверного приложения и попытайтесь получить доступ к http://localhost:8080/hello
по curl
.
При использовании server и server-boot , который использует InboundGateway для обработки сообщений, вывод выглядит следующим образом.
curl http://localhost:8080/hello
data:ABCD
При использовании server-boot-messagemapping , вывод проснулся, как я и ожидал:
data:A
data:B
data:C
data:D
клиент-запросчик
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
Класс приложения:
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@RestController
class HelloController {
Mono<RSocketRequester> requesterMono;
public HelloController(RSocketRequester.Builder builder) {
this.requesterMono = builder.connectTcp("localhost", 7000);
}
@GetMapping(value = "hello", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> uppercase() {
return requesterMono.flatMapMany(
rSocketRequester -> rSocketRequester.route("/uppercase")
.data(Flux.just("a", "b", "c", "d"))
.retrieveFlux(String.class)
);
}
}
При запуске этого клиента и 3 серверов попробуйте чтобы получить доступ к * 11 02 * по curl
.
При использовании server и server-boot , который использует InboundGateway для обработки сообщений, он генерирует исключение приведения класса.
При использовании server-boot-messagemapping вывод работает как я и ожидал:
data:A
data:B
data:C
data:D
Я не знаю, в чем проблема конфигурации InboundGateway и OutboundGateway?