Нет простого способа добиться этого. Определение того, заинтересован ли потребитель или нет, предложит некоторые варианты дизайна. Кроме того, для вашей проблемы нет готового решения. Вы можете использовать концепцию пульса для уведомления производителей о состоянии потребителя.
+----------+ +-------------+ +--------+
| |<--Heartbeat---| |---Message--->| |
| Producer | | AMQP Server | |Consumer|
| |----Message--->| |<--Heartbeat--| |
+----------+ +-------------+ +--------+
Ваша установка будет выглядеть примерно так: этот производитель (ms1
) будет отправлять сообщения на сервер AMQP и будет использовать Heartbeat события от потребителя (ов), чтобы определить, жив ли потребитель (ы). Это станет более сложным, если у вас будет более одного производителя / потребителя для одного и того же topi c.
Если у вас будет более одного потребителя, несколько контрольных сигналов будут отправлены в одной теме / очереди. Вам нужно определить, какой из них жив, вам также необходимо учитывать, что потребители идут вниз / вверх.
Каждый тактовый импульс может иметь временную метку, когда он был сгенерирован, чтобы вы могли принять решение о живом потребителе на более детальной уровень. Следующий вопрос: когда отправлять сердцебиение? Поскольку вы используете загрузку Spring, у вас не будет гибкости отправки пульса за пределами прослушивателя сообщений. Если вы используете интерфейс MessageListener
, сделайте что-то вроде этого.
Создайте класс издателя пульса
@Component
class HeartBeatPublisher {
public void registerMessageListener( String listenerName,
String topicOrQueueName,
Long intervals) {
// store this detail in a map
// do lazy init of threads to send heartbeat
}
}
Класс слушателя вашего сообщения хотел бы
@Component
class MyMessageListener implements MessageListener {
@Autowired HeartBitPublisher heartBeatPublisher;
@PostConstruct
public void init(){
// 30 seconds interval
heartBeatPublisher.registerMessageListener("MyMessageListener",
"MyMessageListener",
30*1000 );
}
void onMessage(Message message){
// consume message here
}
}
Если вы не используете MessageListener
, вы все равно можете отправлять биения, в этом случае я бы рекомендовал добавить по одному слушателю на компонент.
@Component
class MyMessageListener{
@Autowired HeartBeatPublisher heartBeatPublisher;
@PostConstruct
public void init(){
// 30 seconds interval
heartBeatPublisher.registerMessageListener("MyMessageListener",
"MyMessageListener",
30*1000 );
}
@RabbitListener(queues="myQueue")
public void onMessage(Object message) {
// Consume message
}
}
Со стороны производителя вам необходимо добавить слушателя (s), чтобы проверить, какой потребитель (-ы) активен / активен, используя тактовый сигнал. Как только вы узнаете, какие потребители являются активными, вы можете использовать это, чтобы ограничить публикацию сообщений sh.
@Component
class HeartBeatListener {
private List<String> queues;
@PostConstruct
public void init(){
// initialize queue and consumer status to inactive
// at certain intervals checks for missing heartbeat
// if you find heartbeats are not being sent that means either
// that consumer has died or there's a delay in heart bit
// publish in that case mark that consumer/topic/queue inactive
}
@RabbitListener(queues="myQueue-HeartBeat")
public void onMessage(Object message) {
// update consumer status
// Record heart beat for a given consumer/topic
}
}
Вместо того, чтобы делать все это, вы можете использовать Consul , Zookeeper или Etcd , чтобы перенести некоторые работы в эти системы.