Ретранслятор сообщений Spring Boot, похоже, подключен, но сообщения не передаются внешнему (amazonMQ) посреднику - PullRequest
2 голосов
/ 28 июня 2019

Я настраиваю реле внешнего брокера (amazonMQ) для веб-сокета в моем приложении Spring Boot.

При запуске конфигурации, кажется, все в порядке, и реле подключено согласно журналу. Я проверил с плохим URL, и я ясно получаю UnknownHostException, поэтому я думаю, что мои конфигурации хоста хороши.

Я проверил это с помощью локального ActiveMQ, и моя тестовая настройка работает отлично, поэтому нет проблем с потоком сообщений. Я видел подключенных потребителей и созданную тему в консоли управления.

Вот мой код, основанный на Spring Documentation здесь !

Когда я пытаюсь подключиться к клиенту, я получаю сообщение о подключении, за которым следует отключение для того же сеанса.

Если это проблема с SSL, то я не нашел ссылки в Spring Documentation для настройки SSL.


   private static final String HOST = "b-xxxxxxxxxxxxxxxxxx.mq.aws-region.amazonaws.com";
    private static final int PORT = 61617;
    private static final String USER = "username-here";
    private static final String PASSCODE = "passcode-here";

    private ReactorNettyTcpClient<byte[]> createTcpClient() {

        return new ReactorNettyTcpClient(HOST, PORT, new StompReactorNettyCodec());
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay()
                .setRelayHost(HOST)
                .setRelayPort(PORT)
                .setSystemLogin(USER)
                .setSystemPasscode(PASSCODE)
                .setClientLogin(USER)
                .setClientPasscode(PASSCODE)
                .setTcpClient(createTcpClient());


        registry.setApplicationDestinationPrefixes("/app")
                .setPathMatcher(new AntPathMatcher("."));

    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
                .setAllowedOrigins("*").withSockJS();
    }

     @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        AuthenticatedUser a = null;
        registration.interceptors(new ChannelInterceptorAdapter() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                log.info(" accessor {}", accessor);

                return message;
            }

        });

Журнал при запуске

     Starting...
     Connecting "system" session to 
      stomp://XXXXXXXXXXXXXXXXX.mq.XXXXXX.amazonaws.com:61617
     Started.
     Tomcat started on port(s): 8080 (http) with context path ''

Журнал, когда клиент пытается подключиться

     accessor StompHeaderAccessor [headers={simpMessageType=CONNECT, 
     stompCommand=CONNECT, nativeHeaders={accept-version=[1.1,1.0], heart-beat=[10000,10000]}, 
     simpSessionAttributes={}, simpHeartbeat=[J@7070e97e, 
     simpSessionId=52usza4t}]

     accessor StompHeaderAccessor [headers={simpMessageType=DISCONNECT, 
     stompCommand=DISCONNECT, simpSessionAttributes={}, 
     simpSessionId=52usza4t}]

Кто-нибудь сталкивался с чем-то подобным? Заранее спасибо:)

1 Ответ

0 голосов
/ 03 июля 2019

Я решил это с правильными версиями библиотеки и конфликтами специально с реактором. Поделиться тем, что сработало для меня.

Сначала я обновил свою весеннюю загрузочную версию до версии 2.1.6.RELEASE с версии 2.0.4

compile 'io.netty:netty-all:4.1.36.Final'

compile 'io.projectreactor:reactor-core:3.2.10.RELEASE'
compile 'io.projectreactor:reactor-net:2.0.5.RELEASE'
compile 'io.projectreactor:reactor-spring-context:2.0.7.RELEASE'
compile 'io.projectreactor.netty:reactor-netty:0.8.9.RELEASE'


// had to add this as application was failing to start complaining about jms 
context.
compile 'javax.jms:javax.jms-api:2.0.1/reactor'

compile group: 'org.springframework', name: 'spring-messaging', version: 
'5.1.8.RELEASE'
compile group: 'org.springframework', name: 'spring-jms', version: 
'5.0.6.RELEASE'

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompReactorNettyCodec;
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
import org.springframework.security.authentication.AuthenticationManager;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;

@Slf4j
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Qualifier
    private AuthenticationManager authenticationManager;

    @Autowired
    private WebSocketSecurityHandler webSocketSecurityHandler;

    @Value("${spring.config.mq.username}")
    private String userName;
    @Value("${spring.config.mq.password}")
    private String password;

    @Value("${spring.config.mq.host}")
    private String host;  // "b-xxxxxx-xxxxx-xxxx-xxxx-xxxxx-x.mq.eu-central-1.amazonaws.com";

    @Value("${spring.config.mq.port}")
    private int port;    // Amazon MQ (ActiveMQ) STOMP port = 61614


    private SocketAddress getAddress() {
        try {
            InetAddress addr = InetAddress.getByName(host);
            SocketAddress sockaddr = new InetSocketAddress(addr, port);
            return sockaddr;
        } catch (UnknownHostException e) {
            log.error("failed to connect");
        }
        return null;
    }

    private ReactorNettyTcpClient<byte[]> createTcpClient() {
        ReactorNettyTcpClient unSecured;

        unSecured = new ReactorNettyTcpClient<>(
                client -> client.addressSupplier(() -> getAddress()).secure(),
                new StompReactorNettyCodec());

        return unSecured;
    }


    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/topic/", "/queue/")
                .setRelayHost(host)
                .setRelayPort(port)
                .setSystemLogin(userName)
                .setSystemPasscode(password)
                .setClientLogin(userName)
                .setClientPasscode(password)
                .setTcpClient(createTcpClient());


        registry.setApplicationDestinationPrefixes("/app");
        //.setPathMatcher(new AntPathMatcher("."));

    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket");
        registry.addEndpoint("/sockjs")
                .withSockJS();
    }

}
...