Я хочу настроить брокер сообщений webbocket с пружинной загрузкой так, чтобы он отправлял сообщения в том порядке, в котором они были отправлены.
Основываясь на ответах на аналогичные вопросы , я попытался установить размер пула исполнителей диспетчерских задач равным 1, но я по-прежнему получаю сообщения, отправляемые в неправильном порядке.
В целях отладки я добавил перехватчики каналов до и после отправки, которые регистрируют потоки, в которые отправляются сообщения, и я вижу, что идентификаторы потоков различаются.
Что я делаю не так?
Код (Kotlin):
Конфигурация Websocket:
package foo.bar
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.Message
import org.springframework.messaging.MessageChannel
import org.springframework.messaging.simp.config.ChannelRegistration
import org.springframework.messaging.simp.config.MessageBrokerRegistry
import org.springframework.messaging.support.ChannelInterceptor
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker
import org.springframework.web.socket.config.annotation.StompEndpointRegistry
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer{
companion object {
private val LOGGER = LoggerFactory.getLogger(WebSocketConfig::class.java)
}
override fun configureMessageBroker(config: MessageBrokerRegistry) {
config.enableSimpleBroker("/topic")
config.setApplicationDestinationPrefixes("/app");
config.configureBrokerChannel().taskExecutor().corePoolSize(1)
config.configureBrokerChannel().taskExecutor().maxPoolSize(1)
val channelInterceptor: ChannelInterceptor = object: ChannelInterceptor {
override fun preSend(message: Message<*>, channel: MessageChannel): Message<*> {
LOGGER.debug("Message broker sending message on Thread " + Thread.currentThread().id);
return message
}
override fun postSend(message: Message<*>, channel: MessageChannel, sent: Boolean) {
LOGGER.debug("Message broker sent message on Thread " + Thread.currentThread().id);
}
}
config.configureBrokerChannel().interceptors(channelInterceptor)
}
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
registry.addEndpoint("/ws")
.withSockJS()
}
override fun configureClientOutboundChannel(registration: ChannelRegistration) {
registration.taskExecutor().corePoolSize(1)
registration.taskExecutor().maxPoolSize(1)
}
override fun configureClientInboundChannel(registration: ChannelRegistration) {
registration.taskExecutor().corePoolSize(1)
registration.taskExecutor().maxPoolSize(1)
}
}
(раздетый) код для отправки сообщения:
@Controller
class StateController
@Autowired constructor(
private val template: SimpMessagingTemplate
) {
....
fun publishMsg(topicId: String, msg: MyMessageType){
template.convertAndSend("/topic/msg/"+topicId, msg)
}
}
Вот некоторые примеры регистрации. Как вы можете видеть, исполнитель использует более одного Thread
, или, скорее, существует более одного исполнителя. Кроме того, идентификаторы потоков перемещаются взад и вперед, что для меня выглядит четким подтверждением того, что выполнение диспетчеризации не такое однопоточное, как я ожидал.
Вход:
09:48:12.257 DEBUG [ault-executor-4] channelInterceptor$1.preSend : 32 Message broker sending message on Thread 60
09:48:12.257 DEBUG [ault-executor-0] channelInterceptor$1.preSend : 32 Message broker sending message on Thread 47
09:48:12.257 DEBUG [ault-executor-0] channelInterceptor$1.postSend : 38 Message broker sent message on Thread 47
09:48:12.257 DEBUG [ault-executor-4] channelInterceptor$1.postSend : 38 Message broker sent message on Thread 60