Добро пожаловать в переполнение стека!
При задании таких вопросов вы всегда должны показывать соответствующий код и компоненты конфигурации.
Я предполагаю, что вы используете RoutingConnectionFactory
.
Он использует ThreadLocal
для хранения ключа поиска, поэтому отправка должна происходить в том же потоке, который установил ключ.
Как правило, вы никогда не должны go асинхронный в слушателе; вы рискуете потерять сообщение. Чтобы увеличить параллелизм, используйте свойства параллелизма в контейнере.
РЕДАКТИРОВАТЬ
Один из способов - передать ключ поиска в заголовке сообщения:
@Bean
public RabbitTemplate template(ConnectionFactory rcf) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(rcf);
Expression expression = new SpelExpressionParser().parseExpression("messageProperties.headers['cfSelector']");
rabbitTemplate.setSendConnectionFactorySelectorExpression(expression);
return rabbitTemplate;
}
@RabbitListener(queues = "foo")
public void listen1(String in) {
IntStream.range(0, 10)
.parallel()
.mapToObj(i -> in + i)
.forEach(val -> {
this.template.convertAndSend("bar", val.toUpperCase(), msg -> {
msg.getMessageProperties().setHeader("cfSelector", "[bar]");
return msg;
});
});
}