WebFlux + RSocket, Как передать поток из RSocket в WebFlux - PullRequest
0 голосов
/ 11 июля 2020

Я пытаюсь использовать WebFlux с RSocket. В примере приложения есть серверные и клиентские приложения. оба работают на WebFlux и RSocket, мой тип связи rsocket - поток запросов. Приложение клиент-сервер отлично работает для пары одновременных запросов, однако, когда я загружаю тест с 1000qps с 8 потоками, запросы начинают зависать. В ходе исследования, приведенного ниже, образец кода прошел нагрузочный тест.

РАБОЧИЙ ОБРАЗЕЦ

RSocketClientConfig. java

public class RSocketClientConfig {

    @Bean
    RSocketRequester rSocketRequester(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies,
            RSocketClientProperties clientProp) {

        RSocketRequester rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies)
                .dataMimeType(new MimeType("application", "x-protobuf"))
                .connectTcp(clientProp.getHost(), clientProp.getRsocPort()).retry().block();

        rsocketRequester.rsocket().onClose().doOnError(error -> log.warn("Connection CLOSED"))
                .doFinally(consumer -> log.info("Client DISCONNECTED")).subscribe();
        return rsocketRequester;
    }


}

Клиент. java

@Service
public class PersonRSocketClient {

    @Autowired
    private RSocketRequester personClient;

    public Flux<Person> list() {
        return personClient.route("person").retrieveFlux(Person.class);
    }

}

НЕ РАБОТАЕТ

RSocketClientConfig. java

public class RSocketClientConfig {

    @Bean
    Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder rsocketRequesterBuilder, RSocketStrategies strategies,
        RSocketClientProperties clientProp) {

        
        Mono<RSocketRequester> rsocketRequester = rsocketRequesterBuilder.rsocketStrategies(strategies)
                .dataMimeType(new MimeType("application", "x-protobuf"))
                .connectTcp(clientProp.getHost(), clientProp.getRsocPort());

        return rsocketRequester;
    }
}

Клиент. java

@Service
public class PersonRSocketClient {

    @Autowired
    private Mono<RSocketRequester> personClient;

    public Flux<Person> list() {
        return personClient
                .flatMapMany(rsocket -> rsocket.route("person").retrieveFlux(Person.class));
    }

}

Как правильно сопоставить поток запроса с потоком ?

...