Кэшируются ли сообщения websocket при использовании Spring + SockJS + STOMP?Если нет, как обнаружить разрыв сети? - PullRequest
0 голосов
/ 20 мая 2019

У меня разные части страницы обновляются в разное время сообщениями веб-сокетов. Если по какой-либо причине происходит сбой сетевого подключения к серверу (в течение любого периода от нескольких секунд до нескольких дней), мне нужно вернуть страницу в правильное состояние. Я использую веб-сокеты Spring на бэкенде, а SockJS и STOMP.js на переднем конце (встроенный в Angular).

В 1. Любая часть этого кэша отправляет сообщения веб-сокетов (я использую веб-сокеты только один путь, от сервера к клиенту), а затем обнаруживает сбой сети и отправляет сохраненные сообщения при восстановлении соединения? (поэтому этот сценарий автоматически вернет страницу в правильное состояние)

В 2. В противном случае мне нужно как-то обнаружить потерю сетевого подключения - как это сделать точно? (Затем я запускаю полную перезагрузку страницы из внешнего интерфейса - это бит, который я могу легко сделать)

Мой бэкэнд находится в Groovy, с использованием Spring Websockets, т. Е .:

import org.springframework.messaging.simp.SimpMessagingTemplate
SimpMessagingTemplate brokerMessagingTemplate
brokerMessagingTemplate.convertAndSend('/topic/updatepage', pageComponentMessage)

с этим для конфигурации:

@CompileStatic
@Configuration
@EnableWebSocketMessageBroker
class MySocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    void configureMessageBroker(MessageBrokerRegistry messageBrokerRegistry) {
        messageBrokerRegistry.enableSimpleBroker "/queue", "/topic"
        messageBrokerRegistry.setApplicationDestinationPrefixes "/app"
    }

    @Override
    void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
        stompEndpointRegistry.addEndpoint("/stomp").setAllowedOrigins("*").withSockJS()
    }

    @Bean
    GrailsSimpAnnotationMethodMessageHandler grailsSimpAnnotationMethodMessageHandler(
        SubscribableChannel clientInboundChannel,
        MessageChannel clientOutboundChannel,
        SimpMessageSendingOperations brokerMessagingTemplate
    ) {
        def handler = new GrailsSimpAnnotationMethodMessageHandler(clientInboundChannel, clientOutboundChannel, brokerMessagingTemplate)
        handler.destinationPrefixes = ["/app"]
        return handler
    }

    @Bean
    GrailsWebSocketAnnotationMethodMessageHandler grailsWebSocketAnnotationMethodMessageHandler(
        SubscribableChannel clientInboundChannel,
        MessageChannel clientOutboundChannel,
        SimpMessageSendingOperations brokerMessagingTemplate
    ) {
        def handler = new GrailsWebSocketAnnotationMethodMessageHandler(clientInboundChannel, clientOutboundChannel, brokerMessagingTemplate)
        handler.destinationPrefixes = ["/app"]
        return handler
    }

}

и угловой код переднего конца:

export class MyWSService {
  private sockjsclient = null; // SockJS socket that connects to the server (preferably using a WebSocket)
  private stompClient = null; // Stomp client that handles sending messages over the WebSocket

  subscribeToTopic(topic: string, subInstance: any, callbackfn): any { 

    // SockJS socket connection does not exist yet, set it up:
    if(!this.sockjsclient) {
      this.sockjsclient = new SockJS(myWebsocketUrl);
    }

    // If STOMP instance (to send messages over the socket) does not exist yet, set it up:
    if(!this.stompClient) {

      this.stompClient = Stomp.over(this.sockjsclient);

      this.stompClient.connect({}, () => {

        subInstance.wsSubscription = this.stompClient.subscribe(topic, (message) => callbackfn(message));
      })
    }
    // STOMP instance already exists, so use that existing connection:
    else {
        subInstance.wsSubscription = this.stompClient.subscribe(topic, (message) => callbackfn(message));
      }
  }

  unsubscribeFromTopic(subscription: any) {
    subscription.unsubscribe(); // Unsubscribe from topic
  }
}

Спасибо

1 Ответ

1 голос
/ 07 июня 2019

Проблема уже на теоретическом уровне. Даже если бы вы использовали кеширование, вы не смогли бы знать, как долго вы должны их хранить.

То, что я сделал в проекте для получения этой функциональности, - это подписка всех пользователей на их выделенную пользовательскую очередь и настройка RabbitMQ таким образом, чтобы срок действия очередей истекал только через определенное время. При таком подходе - а поскольку в пользовательской очереди только один подписчик - сообщение, которое не было доставлено, будет просто оставаться в очереди до тех пор, пока соответствующий пользователь не получит его. На стороне клиента вы можете установить новое соединение через веб-сокет и подписки, как только вы потеряете соединение (из-за отсутствия сердцебиения). И как только вы подпишетесь, вы получите сообщение, которое вы пропустили.

Для настройки пользовательских очередей в RabbitMQ я реализовал ChannelInterceptor, который реагирует на SUBSCRIBE фреймы:

public class SubscribeInterceptor implements ChannelInterceptor {

    private static final int EXPIRE_IN_MILLISECONDS = 30 * 1000;

    @Override
    public Message<?> preSend(@NonNull Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        if (accessor.getCommand() == StompCommand.SUBSCRIBE) {
            String username = accessor.getUser().getName();
            String queueName = Stream.of(accessor.getDestination().split("/"))
                    .reduce((f, s) -> s)
                    .orElseThrow(IllegalStateException::new);

            accessor.setNativeHeader("x-queue-name", String.format("%s_%s", username, queueName));
            accessor.setNativeHeader("x-expires", Integer.toString(EXPIRE_IN_MILLISECONDS));
            accessor.setNativeHeader("durable", "true");
            accessor.setNativeHeader("auto-delete", "false");
            return MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
        }
        return message;

    }
}
...