Как обработать исключение тайм-аута сети с rabbit mq при асинхронной отправке сообщений с использованием библиотеки spring-amqp - PullRequest
0 голосов
/ 05 февраля 2020

Я написал программу, которая требует взаимодействия нескольких очередей - означает, что потребитель одной очереди записывает сообщение в другую очередь, и у той же программы есть потребитель, который должен выполнить действие в этой очереди. Проблема: Как обработать проблемы тайм-аута сети с очередью при асинхронной отправке сообщений с использованием библиотеки весеннего кролика ampq? Или функция RabbitTemplate.send () должна выдать исключение, если возникают проблемы с сетью. В настоящее время я реализовал RabbitTemplate.send (), который сразу возвращается и работает нормально. Но, если сеть не работает, функция send возвращается немедленно, не выдает никаких исключений и клиентский код предполагает успех. В результате у меня непоследовательное состояние в БД, что сообщение успешно обрабатывается. Обратите внимание, что функция вызова для отправки заключена в транзакционный блок, и цель состоит в том, что в случае сбоя при записи в очередь фиксация БД также должна выполнить откат. Я изучаю следующие решения, но безуспешно:

  1. Можем ли мы настроить rabbitTemplate для выдачи исключения во время выполнения, если возникнет какая-либо проблема с сетевым подключением, чтобы клиентский вызов был уведомлен? Пожалуйста, предложите, как это сделать.

  2. Должны ли мы использовать синхронный вызов функции SendAndReceive, но это приводит к задержке обработки? Другая проблема, наблюдаемая с этой функцией, заключается в том, что мой потребительский код получает уведомление, в то время как функция sendAndReceive по-прежнему заблокирована для записи сообщения в очередь. Пожалуйста, сообщите, если мы можем отложить уведомление в очередь, если функция sendAndReceive не будет возвращена. Но вызов SendAndReceive () вызывал исключение amqp, если сеть не работала, что мы смогли перехватить, но это связано с затратами, связанными с производительностью. Мое приложение является многопоточным, если несколько потоков отправляют сообщения с помощью sendAndReceive (), как библиотека spring-amqp управляет связью очереди? Внутренне создает канал на запрос? Если сообщения доставляются по одному и тому же каналу, это сильно повлияет на производительность многопоточного приложения.

  3. Может ли кто-нибудь поделиться примером кода для использования функции SendAndReceive с рекомендациями?

  4. Есть ли в библиотеке spring-amqp какая-либо функция для проверки работоспособности сервера RabbitMQ перед отправкой вызова функции send? Я исследовал rabbitTemplate.isRunning (), но не получил должного результата. Если требуется какая-либо конкретная c конфигурация, пожалуйста, предложите.

  5. Любое другое решение, чтобы рассмотреть вопрос о гарантированной доставке сообщений или обработать проблемы тайм-аута сети, чтобы выбросить исключения времени выполнения клиенту.


Согласно комментарию Гэри ниже, я установил: rabbitTemplate.setChannelTransacted (true); и это делает вызов Syn c. Следующая часть проблемы заключается в том, что если у меня есть блок транзакции во внешнем блоке, вызов RabbitTemplate.send () немедленно возвращается. Я ожидаю, что блок транзакции внешней функции должен ждать возврата внутренней функции, в противном случае, я не получу ожидаемый результат, поскольку изменения в моей БД сохраняются, хотя мы включили setChannelTransacted в значение true. Я пробовал разные уровни распространения транзакций, но безуспешно. Пожалуйста, сообщите, если я делаю что-то не так, и просмотрите параметры распространения транзакций, как показано ниже:

@Transactional
    public void notifyQueueAndDB(DBRequest dbRequest) {
        logger.info("Updating Request in DB");
        dbService.updateRequest(dbRequest));
        //Below is call to RabbitMQ library 
        mqService.sendmessage(dbRequest); //If sendMessage fails because of network outage, I want DB commit also to be rolled-back.
    } 

MQService, определенный в другой библиотеке проекта, фрагмент ниже.

@Transactional( propagation =  Propagation.NESTED)
 private void sendMessage(......) {
     ....
       rabbitTemplate.send(this.queueExchange, queueName, amqpMessage);

 }catch (Exception exception) {
    throw exception          
}

1 Ответ

1 голос
/ 05 февраля 2020
  1. Включить транзакции, чтобы отправка была синхронной.

или

Используйте Подтверждение издателя и дождитесь получения подтверждения.

Любой из них будет немного медленнее.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...