Я просто хочу, чтобы мой код создавал новый поток для каждого нового подключения к веб-сокету от каждого клиента (тот же поток для CONNECT + SUBSCRIBE + SEND + UNSUBSCRIBE + DISCONNECT). У меня есть конфигурация веб-сокета и обработчик сообщений, как показано ниже, которые не работают. Я имею в виду, даже после добавления getThreadPoolTaskExecutor () и настройки его в configureMessageBroker (). Я не вижу разницы в исполнении веб-сокетов.
Любая помощь или предложение приветствуются.
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {
private TaskScheduler messageBrokerTaskScheduler;
@Autowired
public void setMessageBrokerTaskScheduler(TaskScheduler taskScheduler) {
this.messageBrokerTaskScheduler = taskScheduler;
}
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxBinaryMessageBufferSize(1024000);
return container;
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/sub")
.setHeartbeatValue(new long[] {10000,5000})
.setTaskScheduler(this.messageBrokerTaskScheduler);
config.configureBrokerChannel().taskExecutor(getThreadPoolTaskExecutor());
config.setApplicationDestinationPrefixes("/app");
}
private ThreadPoolTaskExecutor getThreadPoolTaskExecutor(){
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setMaxPoolSize(100);
threadPoolTaskExecutor.setCorePoolSize(75);
threadPoolTaskExecutor.setQueueCapacity(75);
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.setThreadNamePrefix("Websocket-Handler");
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/baseuri")
.setAllowedOrigins("*").withSockJS();
}
protected void configure(HttpSecurity http) throws Exception {
http.csrf().disable();
}
@Override
protected boolean sameOriginDisabled() {
return true;
}
@SuppressWarnings("all")
public void customizeClientInboundChannel(ChannelRegistration registration) {
registration.setInterceptors(getWebsocketMsgHandler());
}
@Bean
public WebsocketMsgHandler getWebsocketMsgHandler() {
return new WebsocketMsgHandler();
}
}
@Component
public class WebsocketMsgHandler extends ChannelInterceptorAdapter {
public Message<?> preSend(Message<?> message, MessageChannel channel) {
// business logic
}
}