Я пытаюсь использовать WebFlux с RSocket. В примере приложения есть серверные и клиентские приложения. оба работают на WebFlux и RSocket, мой тип связи rsocket - поток запросов. Приложение клиент-сервер отлично работает для пары одновременных запросов, однако, когда я загружаю тест с 1000qps с 8 потоками, запросы начинают зависать. В ходе исследования, приведенного ниже, образец кода прошел нагрузочный тест.
РАБОЧИЙ ОБРАЗЕЦ
RSocketClientConfig. java
public class RSocketClientConfig {
@Bean
RSocketRequester rSocketRequester(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies,
RSocketClientProperties clientProp) {
RSocketRequester rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies)
.dataMimeType(new MimeType("application", "x-protobuf"))
.connectTcp(clientProp.getHost(), clientProp.getRsocPort()).retry().block();
rsocketRequester.rsocket().onClose().doOnError(error -> log.warn("Connection CLOSED"))
.doFinally(consumer -> log.info("Client DISCONNECTED")).subscribe();
return rsocketRequester;
}
}
Клиент. java
@Service
public class PersonRSocketClient {
@Autowired
private RSocketRequester personClient;
public Flux<Person> list() {
return personClient.route("person").retrieveFlux(Person.class);
}
}
НЕ РАБОТАЕТ
RSocketClientConfig. java
public class RSocketClientConfig {
@Bean
Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies,
RSocketClientProperties clientProp) {
Mono<RSocketRequester> rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies)
.dataMimeType(new MimeType("application", "x-protobuf"))
.connectTcp(clientProp.getHost(), clientProp.getRsocPort());
return rsocketRequester;
}
}
Клиент. java
@Service
public class PersonRSocketClient {
@Autowired
private Mono<RSocketRequester> personClient;
public Flux<Person> list() {
return personClient
.flatMapMany(rsocket -> rsocket.route("person").retrieveFlux(Person.class));
}
}
Как правильно сопоставить поток запроса с потоком ?