веб-сокет Spring Boot Rabbit MQ клиент не получает сообщения по темам, на которые он подписан - PullRequest
0 голосов
/ 10 марта 2019

Привет. Я пытаюсь создать веб-сокет с загрузочной пружиной с rabbitmq, stomp

.

Я вижу, что клиент подключается к серверу, но после подключенный метод также не вызывается, когда клиент отправляет сообщение сервер получает его, но клиент никогда не получает сообщение по теме подписался. Это работало с нашим брокером rabbitmq

Вот код:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

  @Override
  public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/server").withSockJS();
  }


  @Override
  public void configureMessageBroker(MessageBrokerRegistry messageBrokerRegistry){
    messageBrokerRegistry.setApplicationDestinationPrefixes("/app");
    messageBrokerRegistry.enableStompBrokerRelay("/topic","/queue")
            .setRelayHost("localhost")
            .setRelayPort(61613)
            .setClientLogin("guest")
            .setClientPasscode("guest");

  }
}

@Controller
public class WebSocketController {
      @MessageMapping("/user/{userid}")
      @SendTo("/browser")
      public MovieDevice send(
          @DestinationVariable("userid")  Long userId, MovieDevice message)
          throws Exception {
        message.setMediaServerUrl("shalu");
        return message;
      }
 }

 public MovieDevice publishUrlToClient(MovieDevice movieDevice,MovieData movieData) {
    movieDevice.setMediaServerUrl("test");
    movieDevice.setDeviceId(movieDevice.getDeviceId());
    movieDevice.setMovieId(movieData.getMovie().getId());
    movieDevice.setUserId(movieDevice.getUserId());

    template.convertAndSend("/topic","AKASH");
    //template.convertAndSend("/topic/"+movieDevice.getDeviceId()+"/movie",movieDevice);
    //template.convertAndSend("/topic/browser","Request sent successfully");
    return movieDevice;
  }
`

клиент веб-сокета Java:

public class HelloClient {
    private static Logger logger = Logger.getLogger(HelloClient.class);
    private final static WebSocketHttpHeaders headers = new WebSocketHttpHeaders();

    public ListenableFuture<StompSession> connect() {
        Transport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
        List<Transport> transports = Collections.singletonList(webSocketTransport);
        SockJsClient sockJsClient = new SockJsClient(transports);
        sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());
        WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
        String url = "ws://{host}:{port}/server";
        headers.add("user-name","akash");
        headers.add("Authorization","Bearer f33b9cae-0936-4e64-8547-9fa301ea49d6");
        return stompClient
            .connect(url, headers, new MyHandler(),
                "localhost", 8888);
    }

    public void subscribeGreetings(StompSession stompSession) throws ExecutionException, InterruptedException {
        stompSession.subscribe("/topic", new StompFrameHandler() {
            public Type getPayloadType(StompHeaders stompHeaders) {
                return byte[].class;
            }

            public void handleFrame(StompHeaders stompHeaders, Object o) {
                logger.info("Received greeting " + new String((byte[]) o));
            }
        });

        Long temp =new Long(1);
        stompSession.subscribe("/topic/2/movie", new StompFrameHandler() {
            public Type getPayloadType(StompHeaders stompHeaders) {
                return byte[].class;
            }

            public void handleFrame(StompHeaders stompHeaders, Object o) {
                logger.info("Received greeting " + new String((byte[]) o));
            }
        });
    }

    public void sendHello(StompSession stompSession) {
        //String jsonHello = "{ \"name\" : \"Nick\" }";
        MovieDevice movieDevice = new MovieDevice();
        movieDevice.setDeviceId(1l);
        movieDevice.setUserId(1l);
        movieDevice.setMovieId(1l);
        String jsonHello = "{ \"movie-id\" : 3,"
            + "\"device-id\" : 1,\"user-id\" : 1 }";
        stompSession.send("/app/user/1", jsonHello.getBytes());
    }

    public void sendHello1(StompSession stompSession,StompHeaders stompHeaders) {
        //String jsonHello = "{ \"name\" : \"Nick\" }";
        MovieDevice movieDevice = new MovieDevice();
        movieDevice.setDeviceId(1l);
        movieDevice.setUserId(1l);
        movieDevice.setMovieId(1l);
        String jsonHello = "{\"device-id\" : 1,\"user-id\" : 1 }";
        stompHeaders.setDestination("/app/init");
        stompSession.send("/app/init", jsonHello.getBytes());
    }

    private class MyHandler extends StompSessionHandlerAdapter {
        public void afterConnected(StompSession stompSession, StompHeaders stompHeaders) {
            logger.info("Now connected");
            sendHello1(stompSession,stompHeaders);
        }
    }

    public static void main(String[] args) throws Exception {
        HelloClient helloClient = new HelloClient();
        ListenableFuture<StompSession> f = helloClient.connect();
        StompSession stompSession = f.get();
        logger.info("Subscribing to greeting topic using session " + stompSession);
        helloClient.subscribeGreetings(stompSession);

        logger.info("Sending hello message" + stompSession);
        helloClient.sendHello(stompSession);
        Thread.sleep(60000);
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...