Spring-AMQP запрос / ответ, как управлять таймаутом? - PullRequest
0 голосов
/ 29 октября 2018

Я использую Spring AMQP с RabbitTemplate в режиме запроса / ответа. У меня есть время ожидания для ответа, и после этого времени я возвращаю ответ своему клиенту. Клиент получает ответ. После этого потребитель заканчивает обработку сообщения, а когда он заканчивает, я получаю исключение, сообщающее, что ответ истек. Это хорошо для меня, потому что я хотел бы сделать другую обработку для сообщений, которые находятся в тайм-ауте (опубликовать в другом обмене), но я не уверен, что является лучшим способом сделать это.

Вот некоторые фрагменты кода:

Шаблон Rabbit:

@Configuration
public class MyRabbitConfiguration extends RabbitConfiguration {

    @Bean
    public RetryRabbitTemplate rabbitTemplate() {
        CachingConnectionFactory connectionFactory = (CachingConnectionFactory) this.connectionFactory();
        connectionFactory.setPublisherReturns(true);
        Message.addWhiteListPatterns(CLASSES_TO_SEND_OVER_RABBITMQ);

        RetryRabbitTemplate rabbitTemplate = new RetryRabbitTemplate (connectionFactory);

        //use default reply to queue amq.rabbitmq.reply-to
        rabbitTemplate.setReplyTimeout(this.timeout);

        return rabbitTemplate;
    }
}

RetryRabbitTemplate:

public class RetryRabbitTemplate extends RabbitTemplate {


    public RetryRabbitTemplate(ConnectionFactory connectionFactory) {
        this.setConnectionFactory(connectionFactory);
    }

    @Override
    public void onMessage(Message message) {
        try {
            super.onMessage(message);
        } catch (Exception e) { //catch AmqpRejectAndDontRequeueException :Reply received after timeout

            try {
                byte[] body = message.getBody();
                SerializerMessageConverter serialize = new SerializerMessageConverter();
                Object objectToSend = serialize.fromMessage(message);
                this.convertAndSend("exchange", "routing_key", objectToSend);
            } catch (Exception e1) {
                LOGGER.error("TODO");
            }
        }
    }
}

Простой потребитель со сном, который больше, чем сон моего получателя, поэтому завершите лечение после того, как отправитель ответит клиенту:

@Component
public class Consumers {

    @RabbitListener(queues = "listner.queue")
    public MyObectResponse Consumers (MyObect message) {
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new MyObectResponse ();
    }
}

Я думаю, что это не лучший способ, но я не нашел лучшего решения. Есть кто-то с лучшей идеей? Спасибо

...