Добавьте неблокирующего потребителя, используя AsyncRabbitTemplate - PullRequest
0 голосов
/ 30 марта 2020

Я новичок в Rabbitmq и хотел бы создать неблокирующего потребителя. я следовал Как построить неблокирующий потребитель при использовании AsyncRabbitTemplate с шаблоном запроса / ответа , создал приложение и развернул его в локальной системе. Но я получаю AmqpReplyTimeoutException.

Здесь я прилагаю некоторый журнал:


2020-03-30 23:13:33,781 DEBUG [main] RabbitTemplate []>: Publishing message [(Body:'test' MessageProperties [headers={}, correlationId=a6a94f00-c5a3-4f41-a5b9-69293bc6d153, replyTo=amq.rabbitmq.reply-to, contentType=text/plain, contentEncoding=UTF-8, contentLength=4, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [], routingKey = [tax.webflux.reactor.determine]

AmqpReplyTimeoutException [Reply timed out, requestMessage=(Body:'test' MessageProperties [headers={}, correlationId=a6a94f00-c5a3-4f41-a5b9-69293bc6d153, replyTo=amq.rabbitmq.reply-to, contentType=text/plain, contentEncoding=UTF-8, contentLength=4, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])]
    at org.springframework.amqp.rabbit.AsyncRabbitTemplate$RabbitFuture$TimeoutTask.run(AsyncRabbitTemplate.java:757)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.util.concurrent.FutureTask.run(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Вот мой класс:


import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.annotation.PostConstruct;

import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitConverterFuture;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

import com.rabbitmq.client.Channel;

@ComponentScan("com.sap.slh.tax.*")
@SpringBootApplication
@EnableAsync
public class SpringwebfluxdemoApplication {
    private final ExecutorService exec = Executors.newCachedThreadPool();

    @Autowired
    private RabbitTemplate template;

    @Autowired
    private AmqpAdmin amqpAdmin;

    @Bean
    public ApplicationRunner runner(AsyncRabbitTemplate asyncTemplate) {
        return args -> {
            RabbitConverterFuture<Object> future = asyncTemplate.convertSendAndReceive("tax.webflux.reactor.determine", "test");
            future.addCallback(r -> {
                System.out.println("Reply: " + r);
            }, t -> {
                t.printStackTrace();
            });
        };
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate template) {
        return new AsyncRabbitTemplate(template);
    }

    @PostConstruct
    public void initializeQueue() {
        TopicExchange taxServiceTopicExchange = (TopicExchange) ExchangeBuilder.topicExchange("tax.webflux.reactor.TAXSERVICE")
                .durable(true).build();
        Queue taxAttributesDeterminationQueue = QueueBuilder.durable("tax.webflux.reactor.queue").build();
        Binding binding = BindingBuilder.bind(taxAttributesDeterminationQueue).to(taxServiceTopicExchange)
                .with("tax.webflux.reactor.determine");
        amqpAdmin.declareExchange(taxServiceTopicExchange);
        amqpAdmin.declareQueue(taxAttributesDeterminationQueue);
        amqpAdmin.declareBinding(binding);
    }

    @RabbitListener(queues = "tax.webflux.reactor.queue")
    public void listen(String in, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
            @Header(AmqpHeaders.CORRELATION_ID) String correlationId,
            @Header(AmqpHeaders.REPLY_TO) String replyTo) {

        ListenableFuture<String> future = handleInput(in);
        future.addCallback(result -> {
            Address address = new Address(replyTo);
            this.template.convertAndSend(address.getExchangeName(), address.getRoutingKey(), result, m -> {
                m.getMessageProperties().setCorrelationId(correlationId);
                return m;
            });
            try {
                channel.basicAck(tag, false);
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }, t -> {
            t.printStackTrace();
        });
    }

    private ListenableFuture<String> handleInput(String in) {
        SettableListenableFuture<String> future = new SettableListenableFuture<String>();
        exec.execute(() -> {
            try {
                Thread.sleep(2000);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            future.set(in.toUpperCase());
        });
        return future;
    }


    public static void main(String[] args) {
        SpringApplication.run(SpringwebfluxdemoApplication.class, args);
    }

}

Мой файл application.yml:

---
spring:
  rabbitmq:
    host: "${vcap.services.rabbitmq.credentials.hostname:localhost}"
    password: "${vcap.services.rabbitmq.credentials.password:guest}"
    port: "${vcap.services.rabbitmq.credentials.port:5672}"
    username: "${vcap.services.rabbitmq.credentials.username:guest}"
    virtual_host: "${vcap.services.rabbitmq.credentials.virtual_host:/}"

Любая помощь, чтобы решить эту проблему будет принята. Заранее спасибо.

1 Ответ

0 голосов
/ 30 марта 2020

Этому ответу 18 месяцев; фреймворк теперь поддерживает метод слушателя, возвращающий Mono<?> или Future<?>.

См. Асинхронный тип возврата @RabbitListener .

Тогда вам не нужно все это код в слушателе - просто заполните возвращаемое значение.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...