У меня есть процесс RPC, который принимает сообщение, отправляет его в очередь для обработки другим процессом и выбрасывает этот ответ в очередь ответов. Потребитель, который захватывает сообщение в моей очереди процессов, делает подтверждение, и все.
Если весь этот процесс занимает слишком много времени, клиенту будет сообщено, что он не работает. Если сообщение не попадет в процесс, оно умрет в очереди процесса с таймаутом.
Однако, если он умирает с потребителем - сообщение попадает в очередь ответов и никогда не истекает. Я думал, что время ожидания сообщения будет продолжаться вместе с сообщением, но это не так.
Как я могу заставить потребителя привязать тайм-аут к ответу?
@RabbitListener(queues="${rabbitmq.main.in.queue}", containerFactory = "createDirectMessageListenerContainerFactory")
public String processMessage(@Payload String messageAsJson, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
try {
// Process message
AbstractMap.SimpleEntry<Boolean, JsonObject> result = messageServices.processMessage(messageAsJson);
// Take the response, generate a JSON object for return. Then ack the message
JsonObject responseObj = ResponseUtils.buildJsonResponseFromSimpleEntry(result);
// Something here to add a timeout for the reply queue?
channel.basicAck(tag, true);
return responseObj.toString();
}
catch(Exception ex) {
logger.fatal("Exception thrown processing message: ", ex);
channel.basicNack(tag, false, true);
return null;
}