Я работал над этой проблемой некоторое время и решил попросить о помощи.
У меня есть следующий сценарий. Сервер ActiveMQ прослушивает порт 61614. Два WebSocketStompClient подключаются к следующим очередям;
Client1:
/ queue / request / server1 - / queue / replyto / server1
Client2:
/ queue / request / server2 - / queue / replyto / server2
2 сервера связываются друг с другом и запрашивают информацию.
У меня нет проблем с этими сценариями.
Отправьте запрос от сервера 1 в очередь сервера 2 и получите ответ в очереди ответов сервера1.
Вроде как: У меня нет 10 репутаций ...
Sending SEND {destination=[/queue/request/server2], session=[0d2573e2-079e-ad9c-71df-9274eeba2519], receipt=[3]} etc..
Received MESSAGE {destination=[/queue/request/server2], session=[0d2573e2-079e-ad9c-71df-9274eeba2519] etc...
Здесь выполняется логика случайного приложения ...
Sending SEND {destination=[/queue/replyto/server1] ,session=[0d2573e2-079e-ad9c-71df-9274eeba2519], etc...
Received MESSAGE {destination=[/queue/replyto/server1], session=[0d2573e2-079e-ad9c-71df-9274eeba2519], etc...
Однако существует проблема, если вы попытаетесь отправить другое сообщение в очередь запросов сервера 1, прежде чем ответить на первый запрос. Ответ отправляется в очередь ответов сервера 2, но никогда не принимается.
Изображение здесь: Никогда не получайте ответ (3. ОТВЕТ) на запрос (2. ЗАПРОС)
Sending SEND { destination=[/queue/replyto/server2], session=[c504b2fe-ae63-1bc2-87ce-651682b7c98e], receipt=[4], etc.
Received RECEIPT {receipt-id=[4]} session=c256762b-ddef-4109-8a3e-04bde832ed85
Надеюсь, это достаточно ясно, дайте мне знать, если потребуется больше объяснений.
Дополнительная информация: я уверен, что сообщение отправлено так, как оно написано в очереди, и я могу его увидеть.
Очередь
Как вы можете видеть, все сообщения сняты / подтверждены.
Стоит отметить, что все это выполняется локально с помощью сервера DoM ActiveMQ и модульного теста, который имитирует то, что происходит на сервере разработки.
С Wireshark я вижу, что сообщение подтверждается сервером2.
отправить кадр
tcp trace
Наконец:
Это настройка, и она идентична для обоих серверов:
Клиент:
WebSocketClient transport = new StandardWebSocketClient();
WebSocketStompClient server1 = new WebSocketStompClient(transport);
server1.setMessageConverter(new MappingJackson2MessageConverter());
server1.setTaskScheduler(taskScheduler);
server1.setDefaultHeartbeat(heartbeat);
server1.connect(bindAddress, Server1SessionHandler);
Обработчик:
public class Server1SessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(final StompSession session, final StompHeaders connectedHeaders) {
logger.debug("Entering RemoteSessionHandler after connected method");
this.stompSession = session;
if (session.isConnected()) {
session.setAutoReceipt(true);
logger.trace("Attempting to subscribe to channel {} using the RequestFrameHandler ", this.subscribeChannel);
session.subscribe(this.subscribeChannel, new Server1RequestFrameHandler(session, null, brokerMessageTtl, logicService, guid));
logger.trace("Attempting to subscribe to channel {} using the ResponseFrameHandler ", this.replyQueue);
session.subscribe(this.replyQueue, new Server1ResponseFrameHandler(this.cache));
publisher.publishEvent(new ConnectionSuccessEvent(Server1SessionHandler.class, "RemoteSessionHandler"));
}
else {
logger.error("Could not connect to stomp Session {}", session.toString());
}
}
}
Запрос обработчика кадра:
public class Server1RequestFrameHandler implements StompFrameHandler {
private StompSession session;
public Server1RequestFrameHandler(final StompSession session) {
this.session = session;
}
@Override
@SuppressWarnings("static-access")
public void handleFrame(final StompHeaders headers, final Object payload) {
...... BUSINESS LOGIC ......
if (session.isConnected()) {
session.send(header, response);
logger.debug("Successfully connected using the stompsession {}, ", session.toString());
}
}
@Override
public Type getPayloadType(final StompHeaders headers) {
return Request.class;
}
}
Обработчик фрейма ответа:
public class Server1ResponseFrameHandler implements StompFrameHandler {
private Server1Cache cache;
public Server1ResponseFrameHandler(final Server1Cache cache) {
this.cache = cache;
}
public void handleFrame(final StompHeaders headers, final Object payload) {
Response response = (Response) payload;
logger.debug("response: {}", response);
cache.cacheMessage(id, response);
}
public Type getPayloadType(final StompHeaders headers) {
return Response.class;
}
}
Дайте мне знать, если вам требуется дополнительная информация.