Многопоточный веб-сокет - PullRequest
0 голосов
/ 08 мая 2020

Я просто хочу, чтобы мой код создавал новый поток для каждого нового подключения к веб-сокету от каждого клиента (тот же поток для CONNECT + SUBSCRIBE + SEND + UNSUBSCRIBE + DISCONNECT). У меня есть конфигурация веб-сокета и обработчик сообщений, как показано ниже, которые не работают. Я имею в виду, даже после добавления getThreadPoolTaskExecutor () и настройки его в configureMessageBroker (). Я не вижу разницы в исполнении веб-сокетов.

Любая помощь или предложение приветствуются.

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {

    private TaskScheduler messageBrokerTaskScheduler;

    @Autowired
    public void setMessageBrokerTaskScheduler(TaskScheduler taskScheduler) {
        this.messageBrokerTaskScheduler = taskScheduler;
    }       

    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxBinaryMessageBufferSize(1024000);
        return container;
    }   

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/sub")
                .setHeartbeatValue(new long[] {10000,5000})
                .setTaskScheduler(this.messageBrokerTaskScheduler);             

        config.configureBrokerChannel().taskExecutor(getThreadPoolTaskExecutor());

        config.setApplicationDestinationPrefixes("/app");
    }

    private ThreadPoolTaskExecutor getThreadPoolTaskExecutor(){
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setMaxPoolSize(100);
        threadPoolTaskExecutor.setCorePoolSize(75);
        threadPoolTaskExecutor.setQueueCapacity(75);
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        threadPoolTaskExecutor.setThreadNamePrefix("Websocket-Handler");
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/baseuri")
                .setAllowedOrigins("*").withSockJS();
    }

    protected void configure(HttpSecurity http) throws Exception {
        http.csrf().disable();
    }

    @Override
    protected boolean sameOriginDisabled() {
        return true;
    }

    @SuppressWarnings("all")
    public void customizeClientInboundChannel(ChannelRegistration registration) {
        registration.setInterceptors(getWebsocketMsgHandler());
    }

    @Bean
    public WebsocketMsgHandler getWebsocketMsgHandler() {
        return new WebsocketMsgHandler();
    }       
}

@Component
public class WebsocketMsgHandler extends ChannelInterceptorAdapter {

public Message<?> preSend(Message<?> message, MessageChannel channel) {
// business logic
}
}
...