Spring AMQP асинхронные ответы и соотнесите его с запросом - PullRequest
0 голосов
/ 02 июля 2018

Я пытаюсь реализовать шаблон запрос-ответ, используя spring amqp.
Я посмотрел на документы и примеры, но до сих пор не могу понять, как использовать Id корреляции и использовать правильный ответ для запроса в асинхронной среде.

Предположим, у меня есть класс Transporter , который получает запрос, отправляет запрос для некоторого requestQueue, а затем ожидает (блокирует) ответ, используя прослушиватель для некоторого responseQueue, а затем возвращает ответ.

Теперь у меня есть AsyncSocketService , который прослушивает requestQueue, а затем отправляет запрос через сокет на другой сервер, который обрабатывает запрос и возвращает ответ. этот сервер работает асинхронно, поэтому, если приходят два запроса, ответ не обязательно должен быть в том же порядке. Это означает, что ответ от AsyncSocketService управляется в другом потоке, который прослушивает сокет InputStream .

После того, как поток, который прослушивает сокет InputStream , получает ответ, он публикует его в responseQueue, и тогда мой Transporter , который прослушивает responseQueue, может вернуть ответ первоначальному звонящему.

Transporter прослушивает ответ Queue следующим образом:

byte[] response = (byte[]) rabbitTemplate.receiveAndConvert(queueName, timeout);

Но таким образом я не могу убедиться, что данный ответ соответствует правильному запросу.

Я видел несколько примеров использования очереди ответов, когда вы определяете SimpleMessageListenerContainer и RabbitTemplate с адресом ответа. но все же я не понимаю, где идентификатор корреляции вступает в игру и как я могу это проверить. и это решение не подходит для меня, потому что мне нужен мой блок Transporter и жду ответа, соответствующего запросу.

Важно отметить, что запросы и ответы имеют поле refId в байте [], чтобы разрешить сопоставление между запросом и ответом, но я не хочу перехватывать и запрашивать ответы, которые не совпадают.

Может ли кто-нибудь помочь найти решение для моего варианта использования?

Спасибо!

1 Ответ

0 голосов
/ 02 июля 2018

Ваша архитектура немного неясна, и я боюсь, что не могу ответить на ваш вопрос о ручной корреляции напрямую, но для сценариев запрос-ответ есть RabbitTempalte.sendAndReceive(). Вы действительно можете настроить его с фиксированным queue для ответов, если это ваше требование:

/**
 * The name of the default queue to receive messages from when none is specified explicitly.
 *
 * @param queue the default queue name to use for receive
 */
public void setQueue(String queue) {

Существует автоматическая поддержка корреляции, которой вы можете управлять с помощью:

/**
 * Set to true to use correlation id provided by the message instead of generating
 * the correlation id for request/reply scenarios. The correlation id must be unique
 * for all in-process requests to avoid cross talk.
 * <p>
 * <b>Users must therefore take create care to ensure uniqueness.</b>
 * @param userCorrelationId true to use user correlation data.
 */
public void setUserCorrelationId(boolean userCorrelationId)

В то же время вам необходимо настроить RabbitTemplate в качестве прослушивателя для некоторых MessageListenerContainer, чтобы действительно использовать эти асинхронные ответы.

Для истинных асинхронных запросов / ответов также существует вариант AsyncRabbitTemplate, который возвращает RabbitMessageFuture для последующего использования ответа.

В любом случае, очень важно, чтобы потребительская сторона requestQueue поддерживала передачу корреляции в ответ. В противном случае на стороне производителя не будет ничего, что соответствовало бы асинхронному ответу с отправленным запросом.

Вся информация содержится в Справочном руководстве .

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...