Как отправитель узнает, что получатель недоступен в JMS с помощью Spring Boot? - PullRequest
0 голосов
/ 19 июня 2020

У меня есть приложение Spring Boot, есть 2 микросервиса, и эти микросервисы обмениваются данными асинхронно с помощью JMS и ActiveMq. Таким образом, отправитель (ms1) отправляет сообщение получателю (ms2), отправитель помещает сообщение в очередь, и если получатель недоступен, сообщение останется в очереди, пока получатель не станет доступен.

Я хочу спросить вас, как Отправитель может узнать, доступен Получатель или нет? Я хочу знать это, поскольку я хочу использовать Hystrix, и если получатель доступен, отправитель покажет такое сообщение: «Транзакция успешно завершена!», Но если получатель недоступен, отправитель покажет другое сообщение, что-то вроде это: «Служба получателя в настоящее время недоступна, сообщение добавляется в очередь и будет отправлено получателю, когда оно станет доступным».

Этот код взят из службы отправителя:

    @HystrixCommand(fallbackMethod="sendMessageToProducerFail")
    private ResponseEntity sendMessageToProducer(String jsonStr) {
        jmsTemplate.convertAndSend(queue, jsonStr);
        return new ResponseEntity("Transaction successfully completed!", HttpStatus.CREATED);
    }

    private ResponseEntity sendMessageToProducerFail(String jsonStr) {
//        "The Receiver service isn't currently availble, the message is added to the queue..."
    }

1 Ответ

0 голосов
/ 20 июня 2020

Нет простого способа добиться этого. Определение того, заинтересован ли потребитель или нет, предложит некоторые варианты дизайна. Кроме того, для вашей проблемы нет готового решения. Вы можете использовать концепцию пульса для уведомления производителей о состоянии потребителя.

+----------+               +-------------+              +--------+
|          |<--Heartbeat---|             |---Message--->|        |
| Producer |               | AMQP Server |              |Consumer|
|          |----Message--->|             |<--Heartbeat--|        |  
+----------+               +-------------+              +--------+

Ваша установка будет выглядеть примерно так: этот производитель (ms1) будет отправлять сообщения на сервер AMQP и будет использовать Heartbeat события от потребителя (ов), чтобы определить, жив ли потребитель (ы). Это станет более сложным, если у вас будет более одного производителя / потребителя для одного и того же topi c.

Если у вас будет более одного потребителя, несколько контрольных сигналов будут отправлены в одной теме / очереди. Вам нужно определить, какой из них жив, вам также необходимо учитывать, что потребители идут вниз / вверх.

Каждый тактовый импульс может иметь временную метку, когда он был сгенерирован, чтобы вы могли принять решение о живом потребителе на более детальной уровень. Следующий вопрос: когда отправлять сердцебиение? Поскольку вы используете загрузку Spring, у вас не будет гибкости отправки пульса за пределами прослушивателя сообщений. Если вы используете интерфейс MessageListener, сделайте что-то вроде этого.

Создайте класс издателя пульса

@Component
class HeartBeatPublisher {
   public void registerMessageListener( String listenerName,
                                        String topicOrQueueName,
                                        Long intervals) {
   // store this detail in a map 
   // do lazy init of threads to send heartbeat 
   }
}

Класс слушателя вашего сообщения хотел бы

@Component
class MyMessageListener implements MessageListener {
   @Autowired HeartBitPublisher heartBeatPublisher;
   @PostConstruct
   public void init(){
      // 30 seconds interval
      heartBeatPublisher.registerMessageListener("MyMessageListener", 
                                                 "MyMessageListener",
                                                 30*1000 );
   }
   void onMessage(Message message){
     // consume message here
   }
}

Если вы не используете MessageListener, вы все равно можете отправлять биения, в этом случае я бы рекомендовал добавить по одному слушателю на компонент.

@Component
class MyMessageListener{
   @Autowired HeartBeatPublisher heartBeatPublisher;
   @PostConstruct
   public void init(){
     // 30 seconds interval
     heartBeatPublisher.registerMessageListener("MyMessageListener", 
                                                "MyMessageListener",
                                                30*1000 );
   }

   @RabbitListener(queues="myQueue")
   public void onMessage(Object message) {
    // Consume message
   }
}  

Со стороны производителя вам необходимо добавить слушателя (s), чтобы проверить, какой потребитель (-ы) активен / активен, используя тактовый сигнал. Как только вы узнаете, какие потребители являются активными, вы можете использовать это, чтобы ограничить публикацию сообщений sh.

@Component
class HeartBeatListener {
  private List<String> queues;
   @PostConstruct 
   public void init(){
      // initialize queue and consumer status to inactive
      // at certain intervals checks for missing heartbeat 
      // if you find heartbeats are not being sent that means either 
      // that consumer has died or there's a delay in heart bit 
      // publish in that case mark that consumer/topic/queue inactive
   }
  
   @RabbitListener(queues="myQueue-HeartBeat")
   public void onMessage(Object message) {
     // update consumer status 
     // Record heart beat for a given consumer/topic
   }
}  

Вместо того, чтобы делать все это, вы можете использовать Consul , Zookeeper или Etcd , чтобы перенести некоторые работы в эти системы.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...