Я новичок в rabbitmq и в настоящее время пытаюсь реализовать неблокирующего производителя с неблокирующим потребителем. Я создал тестового продюсера, где я поигрался с typereference:
@Service
public class Producer {
@Autowired
private AsyncRabbitTemplate asyncRabbitTemplate;
public <T extends RequestEvent<S>, S> RabbitConverterFuture<S> asyncSendEventAndReceive(final T event) {
return asyncRabbitTemplate.convertSendAndReceiveAsType(QueueConfig.EXCHANGE_NAME, event.getRoutingKey(), event, event.getResponseTypeReference());
}
}
И в другом месте тестовая функция, которая вызывается в RestController
@Autowired
Producer producer;
public void test() throws InterruptedException, ExecutionException {
TestEvent requestEvent = new TestEvent("SOMEDATA");
RabbitConverterFuture<TestResponse> reply = producer.asyncSendEventAndReceive(requestEvent);
log.info("Hello! The Reply is: {}", reply.get());
}
До сих пор это было довольно просто: сейчас я застрял в том, как создать потребителя, который тоже не блокирует. Мой текущий слушатель:
@RabbitListener(queues = QueueConfig.QUEUENAME)
public TestResponse onReceive(TestEvent event) {
Future<TestResponse> replyLater = proccessDataLater(event.getSomeData())
return replyLater.get();
}
Насколько я знаю, при использовании @RabbitListener этот слушатель работает в своем собственном потоке. И я мог бы настроить MessageListener для использования более одного потока для активных слушателей. Из-за этого блокирование потока слушателя с помощью future.get () не блокирует само приложение. Тем не менее может быть случай, когда все потоки сейчас блокируются, а новые события помещаются в очередь, когда им, возможно, это не нужно. Я хотел бы просто получить событие без необходимости мгновенно возвращать результат. Что, вероятно, невозможно с @RabbitListener. Что-то вроде:
@RabbitListener(queues = QueueConfig.QUEUENAME)
public void onReceive(TestEvent event) {
/*
* Some fictional RabbitMQ API call where i get a ReplyContainer which contains
* the CorrelationID for the event. I can call replyContainer.reply(testResponse) later
* in the code without blocking the listener thread
*/
ReplyContainer replyContainer = AsyncRabbitTemplate.getReplyContainer()
// ProcessDataLater calls reply on the container when done with its action
proccessDataLater(event.getSomeData(), replyContainer);
}
Как лучше всего реализовать такое поведение с rabbitmq весной?
РЕДАКТИРОВАТЬ Класс конфигурации:
@Configuration
@EnableRabbit
public class RabbitMQConfig implements RabbitListenerConfigurer {
public static final String topicExchangeName = "exchange";
@Bean
TopicExchange exchange() {
return new TopicExchange(topicExchangeName);
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("localhost");
return connectionFactory;
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate() {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory());
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate() {
return new AsyncRabbitTemplate(rabbitTemplate());
}
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
Queue queue() {
return new Queue("test", false);
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("foo.#");
}
@Bean
public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setMaxConcurrentConsumers(5);
factory.setMessageConverter(producerJackson2MessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
@Override
public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
registrar.setContainerFactory(myRabbitListenerContainerFactory());
}
}