Я использую Spring AMQP с RabbitTemplate в режиме запроса / ответа.
У меня есть время ожидания для ответа, и после этого времени я возвращаю ответ своему клиенту. Клиент получает ответ. После этого потребитель заканчивает обработку сообщения, а когда он заканчивает, я получаю исключение, сообщающее, что ответ истек. Это хорошо для меня, потому что я хотел бы сделать другую обработку для сообщений, которые находятся в тайм-ауте (опубликовать в другом обмене), но я не уверен, что является лучшим способом сделать это.
Вот некоторые фрагменты кода:
Шаблон Rabbit:
@Configuration
public class MyRabbitConfiguration extends RabbitConfiguration {
@Bean
public RetryRabbitTemplate rabbitTemplate() {
CachingConnectionFactory connectionFactory = (CachingConnectionFactory) this.connectionFactory();
connectionFactory.setPublisherReturns(true);
Message.addWhiteListPatterns(CLASSES_TO_SEND_OVER_RABBITMQ);
RetryRabbitTemplate rabbitTemplate = new RetryRabbitTemplate (connectionFactory);
//use default reply to queue amq.rabbitmq.reply-to
rabbitTemplate.setReplyTimeout(this.timeout);
return rabbitTemplate;
}
}
RetryRabbitTemplate:
public class RetryRabbitTemplate extends RabbitTemplate {
public RetryRabbitTemplate(ConnectionFactory connectionFactory) {
this.setConnectionFactory(connectionFactory);
}
@Override
public void onMessage(Message message) {
try {
super.onMessage(message);
} catch (Exception e) { //catch AmqpRejectAndDontRequeueException :Reply received after timeout
try {
byte[] body = message.getBody();
SerializerMessageConverter serialize = new SerializerMessageConverter();
Object objectToSend = serialize.fromMessage(message);
this.convertAndSend("exchange", "routing_key", objectToSend);
} catch (Exception e1) {
LOGGER.error("TODO");
}
}
}
}
Простой потребитель со сном, который больше, чем сон моего получателя, поэтому завершите лечение после того, как отправитель ответит клиенту:
@Component
public class Consumers {
@RabbitListener(queues = "listner.queue")
public MyObectResponse Consumers (MyObect message) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new MyObectResponse ();
}
}
Я думаю, что это не лучший способ, но я не нашел лучшего решения.
Есть кто-то с лучшей идеей?
Спасибо