Spring Messaging / ActiveMQ Подтвержденное сообщение не получено - PullRequest
0 голосов
/ 26 июня 2018

Я работал над этой проблемой некоторое время и решил попросить о помощи.

У меня есть следующий сценарий. Сервер ActiveMQ прослушивает порт 61614. Два WebSocketStompClient подключаются к следующим очередям;

Client1: / queue / request / server1 - / queue / replyto / server1

Client2: / queue / request / server2 - / queue / replyto / server2

2 сервера связываются друг с другом и запрашивают информацию. У меня нет проблем с этими сценариями. Отправьте запрос от сервера 1 в очередь сервера 2 и получите ответ в очереди ответов сервера1.

Вроде как: У меня нет 10 репутаций ...

Sending SEND {destination=[/queue/request/server2], session=[0d2573e2-079e-ad9c-71df-9274eeba2519], receipt=[3]} etc..

Received MESSAGE {destination=[/queue/request/server2], session=[0d2573e2-079e-ad9c-71df-9274eeba2519] etc...

Здесь выполняется логика случайного приложения ...

Sending SEND {destination=[/queue/replyto/server1] ,session=[0d2573e2-079e-ad9c-71df-9274eeba2519], etc...
Received MESSAGE {destination=[/queue/replyto/server1], session=[0d2573e2-079e-ad9c-71df-9274eeba2519], etc...

Однако существует проблема, если вы попытаетесь отправить другое сообщение в очередь запросов сервера 1, прежде чем ответить на первый запрос. Ответ отправляется в очередь ответов сервера 2, но никогда не принимается.

Изображение здесь: Никогда не получайте ответ (3. ОТВЕТ) на запрос (2. ЗАПРОС)

Sending SEND { destination=[/queue/replyto/server2], session=[c504b2fe-ae63-1bc2-87ce-651682b7c98e],  receipt=[4], etc.
Received RECEIPT {receipt-id=[4]} session=c256762b-ddef-4109-8a3e-04bde832ed85

Надеюсь, это достаточно ясно, дайте мне знать, если потребуется больше объяснений.

Дополнительная информация: я уверен, что сообщение отправлено так, как оно написано в очереди, и я могу его увидеть.

Очередь

Как вы можете видеть, все сообщения сняты / подтверждены.

Стоит отметить, что все это выполняется локально с помощью сервера DoM ActiveMQ и модульного теста, который имитирует то, что происходит на сервере разработки.

С Wireshark я вижу, что сообщение подтверждается сервером2.

отправить кадр

tcp trace

Наконец:

Это настройка, и она идентична для обоих серверов:

Клиент:

   WebSocketClient transport = new StandardWebSocketClient();
   WebSocketStompClient server1 = new WebSocketStompClient(transport);
   server1.setMessageConverter(new MappingJackson2MessageConverter());

    server1.setTaskScheduler(taskScheduler);
    server1.setDefaultHeartbeat(heartbeat);
    server1.connect(bindAddress, Server1SessionHandler);

Обработчик:

public class Server1SessionHandler extends StompSessionHandlerAdapter {
   @Override
    public void afterConnected(final StompSession session, final StompHeaders connectedHeaders) {
        logger.debug("Entering RemoteSessionHandler after connected method");
        this.stompSession = session;

    if (session.isConnected()) {
        session.setAutoReceipt(true);

        logger.trace("Attempting to subscribe to channel {} using the RequestFrameHandler ", this.subscribeChannel);
        session.subscribe(this.subscribeChannel, new Server1RequestFrameHandler(session, null, brokerMessageTtl, logicService, guid));

        logger.trace("Attempting to subscribe to channel {} using the ResponseFrameHandler ", this.replyQueue);
        session.subscribe(this.replyQueue, new Server1ResponseFrameHandler(this.cache));

        publisher.publishEvent(new ConnectionSuccessEvent(Server1SessionHandler.class, "RemoteSessionHandler"));
    }
    else {
        logger.error("Could not connect to stomp Session {}", session.toString());
    }
}

}

Запрос обработчика кадра:

public class Server1RequestFrameHandler implements StompFrameHandler {

private StompSession session;

public Server1RequestFrameHandler(final StompSession session) {
    this.session = session;
}

@Override
@SuppressWarnings("static-access")
public void handleFrame(final StompHeaders headers, final Object payload) {
    ...... BUSINESS LOGIC ......

            if (session.isConnected()) {
                session.send(header, response);
                logger.debug("Successfully connected using the stompsession {}, ", session.toString());
            }


}

@Override
public Type getPayloadType(final StompHeaders headers) {
    return Request.class;
}

}

Обработчик фрейма ответа:

public class Server1ResponseFrameHandler implements StompFrameHandler {

private Server1Cache cache;

public Server1ResponseFrameHandler(final Server1Cache cache) {
    this.cache = cache;
}

public void handleFrame(final StompHeaders headers, final Object payload) {
    Response response = (Response) payload;
    logger.debug("response: {}", response);
    cache.cacheMessage(id, response);
}

public Type getPayloadType(final StompHeaders headers) {
    return Response.class;
}

}

Дайте мне знать, если вам требуется дополнительная информация.

1 Ответ

0 голосов
/ 02 июля 2018

Мне удалось обойти это, подписавшись на темы с двумя различными настройками WebSocket Connection. Такие как: Создать запрос клиента

WebSocketClient requestTransport = new StandardWebSocketClient();
WebSocketStompClient requestClient = new WebSocketStompClient(requestTransport);
requestClient.setMessageConverter(new MappingJackson2MessageConverter());

requestClient.setTaskScheduler(taskScheduler);
requestClient.setDefaultHeartbeat(heartbeat);
requestClient.connect(bindAddress, RequestSessionHandler);

Создание отдельного клиента ответа

WebSocketClient responseTransport = new StandardWebSocketClient();
WebSocketStompClient responseClient = new WebSocketStompClient(responseTransport);
responseClient.setMessageConverter(new MappingJackson2MessageConverter());

responseClient.setTaskScheduler(taskScheduler);
responseClient.setDefaultHeartbeat(heartbeat);
responseClient.connect(bindAddress, ResponseSessionHandler);

Где обработчики сеанса запроса и ответа подписываются только на 1 Очередь вместо обоих.

public class RequestSessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(final StompSession session, final StompHeaders connectedHeaders) {
    ...

    if (session.isConnected()) {
       session.subscribe("requestChannel", new Server1RequestFrameHandler(session, null, brokerMessageTtl, logicService, guid));
   ....
    }
}
}

Если у кого-то есть объяснение того, почему это происходило при подписке на обе очереди с использованием только 1 клиента WebSocket, я бы с удовольствием это услышал.

Моя теория заключается в том, что поток, обслуживающий соединение с веб-сокетом, был занят запросом и не мог свободно обрабатывать отправленный ответ.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...