RabbitMQ Spring «Невозможно определить целевую ConnectionFactory для ключа поиска» при использовании Java lambda parallelStream - PullRequest
1 голос
/ 24 марта 2020

У нас есть приложение Spring Java, использующее RabbitMQ, и вот сценарий:

  • Есть получатель, получающий сообщения из очереди и отправляющий их другой. Мы используем «SimpleRabbitListenerContainerFactory» в качестве фабрики контейнеров, но при отправке сообщений в другую очередь внутри «parallelStream» мы получаем IllegalStateException «Невозможно определить целевой ConnectionFactory для ключа поиска» Исключение
  • Когда мы удаляем «parallelStream», он работает безупречно.
    public void sendMessage(final StagingMessage stagingMessage, final Long timestamp, final String country) {

        final List<TransformedMessage> messages = processMessageList(stagingMessage);

        messages.parallelStream().forEach(message -> {
            final TransformedMessage transformedMessage = buildMessage(timestamp, ApiConstants.POST_METHOD, country);
            myMessageSender.sendQueue(country, transformedMessage);
        });
    }

Connectio Facotory, где установлен ключ поиска:

@Configuration
@EnableRabbit
public class RabbitBaseConfig {

    @Autowired
    private QueueProperties queueProperties;

    @Bean
    @Primary
    public ConnectionFactory connectionFactory(final ConnectionFactory connectionFactoryA, final ConnectionFactory connectionFactoryB) {

        final SimpleRoutingConnectionFactory simpleRoutingConnectionFactory = new SimpleRoutingConnectionFactory();
        final Map<Object, ConnectionFactory> map = new HashMap<>();

        for (final String queue : queueProperties.getAQueueMap().values()) {
            map.put("[" + queue + "]", connectionFactoryA);
        }

        for (final String queue : queueProperties.getBQueueMap().values()) {
            map.put("[" + queue + "]", connectionFactoryB);
        }

        simpleRoutingConnectionFactory.setTargetConnectionFactories(map);
        return simpleRoutingConnectionFactory;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter() {

        return new Jackson2JsonMessageConverter();
    }
}

1 Ответ

0 голосов
/ 24 марта 2020

Добро пожаловать в переполнение стека!

При задании таких вопросов вы всегда должны показывать соответствующий код и компоненты конфигурации.

Я предполагаю, что вы используете RoutingConnectionFactory.

Он использует ThreadLocal для хранения ключа поиска, поэтому отправка должна происходить в том же потоке, который установил ключ.

Как правило, вы никогда не должны go асинхронный в слушателе; вы рискуете потерять сообщение. Чтобы увеличить параллелизм, используйте свойства параллелизма в контейнере.

РЕДАКТИРОВАТЬ

Один из способов - передать ключ поиска в заголовке сообщения:

    @Bean
    public RabbitTemplate template(ConnectionFactory rcf) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(rcf);
        Expression expression = new SpelExpressionParser().parseExpression("messageProperties.headers['cfSelector']");
        rabbitTemplate.setSendConnectionFactorySelectorExpression(expression);
        return rabbitTemplate;
    }

    @RabbitListener(queues = "foo")
    public void listen1(String in) {
        IntStream.range(0, 10)
            .parallel()
            .mapToObj(i -> in + i)
            .forEach(val -> {
                this.template.convertAndSend("bar", val.toUpperCase(), msg -> {
                    msg.getMessageProperties().setHeader("cfSelector", "[bar]");
                    return msg;
                });
            });
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...