Как обеспечить строгий порядок прослушивания сообщений Rabbit MQ в Spring Integration? - PullRequest
0 голосов
/ 29 апреля 2020

У меня есть проект Spring Integration, в котором я отправляю и получаю сообщения из очереди RabbitMQ.

Порядок, в котором система публикует сообщения, в порядке, но порядок, в котором она впоследствии получает сообщения, неверен.

Итак, я нашел этот абзац (https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp -strict-ordering ) и настроил слушателя следующим образом: simpleMessageListenerContainer.setPrefetchCount(1);.

У нас было несколько тестов, и он хорошо работал. Однако примерно через неделю он начал вызывать похожие проблемы с упорядочением.

Позвольте мне объяснить немного подробнее:

У меня есть два потока (IntegrationFlow s) в одном приложении интеграции пружин.

В первом IntegrationFlow он создает сообщения и публикует каждое сообщение в очереди кроликов.

Непосредственно перед публикацией оно регистрирует каждое сообщение, и я могу подтвердить, что sequenceNumber увеличивается, как и ожидалось (в моем случае 1,2,3,4,5,6,7,8,9,10, 11).

Затем во втором потоке расходуются эти опубликованные сообщения. Сразу после получения каждого сообщения поток снова регистрирует его. Здесь я обнаружил, что sequenceNumber не увеличивается, как ожидалось (в моем случае 1,3,5,7,2,4,6,8,9,10,11).

Это очень для этого приложения важно обрабатывать сообщения в правильном порядке.

Когда я посмотрел на пользовательский интерфейс кролика, я обнаружил следующее (большинство из них - то, что я ожидаю):

  • кролик имеет 3 соединения (для приложений 3 java)
  • соединение для моего приложения имеет 3 канала. 2 из них бездействуют / не имеют потребителей, 1 имеет 6 подписчиков и количество предварительных выборок равно 1.
  • каждый подписчик имеет количество предварительных выборок 1
  • . Я обеспокоен только одним из этих подписчиков (очередь).
  • эта очередь имеет свойства «ack required», а не «exclusive».

Я не ожидал, что в моем приложении будет 3 канала. Я не настраивал это сам, может быть, Spring Integration / AMQP сделал это для меня.

Теперь я думаю, что другой канал может стать активным, и это вызывает проблему упорядочения. Но я не могу найти это в журнале. И не в конфигурации.

Куски кода:

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(final ConnectionFactory connectionFactory,
                                                                         final Jackson2JsonMessageConverter jackson2MessageConverter,
                                                                         final MethodInterceptor retryInterceptor) {
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
        simpleMessageListenerContainer.setMessageConverter(jackson2MessageConverter);
        simpleMessageListenerContainer.setAdviceChain(retryInterceptor);
        // force FIFO ordering (https://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-strict-ordering):
        simpleMessageListenerContainer.setPrefetchCount(1);
        simpleMessageListenerContainer.setConcurrency();
        return simpleMessageListenerContainer;
    }

    @Bean
    public IntegrationFlow routeIncomingAmqpMessagesFlow(final SimpleMessageListenerContainer simpleMessageListenerContainer,
                                                         final Queue q1, final Queue q2, final Queue q3,
                                                         final Queue q4, final Queue q5,
                                                         final Queue q6) {
        simpleMessageListenerContainer.setQueues(q1, q2, q3, q4, q5, q6);
        return IntegrationFlows.from(
                Amqp.inboundAdapter(simpleMessageListenerContainer)
                        .messageConverter(jackson2MessageConverter))
                .log(LoggingHandler.Level.DEBUG, "com.my.thing")
                .headerFilter(MyMessageHeaders.QUEUE_ROUTING_KEY)
                .route(router())
                .get();
    }

    private HeaderValueRouter router() {
        HeaderValueRouter router = new HeaderValueRouter(AmqpHeaders.CONSUMER_QUEUE);
        router.setChannelMapping(AmqpConfiguration.Q1_NAME, Q1_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q2_NAME, Q2_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q3_NAME, Q3_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q4_NAME, Q4_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q5_NAME, Q5_CHANNEL);
        router.setChannelMapping(AmqpConfiguration.Q6_NAME, Q6_CHANNEL);
        router.setResolutionRequired(false);
        router.setDefaultOutputChannelName("errorChannel");
        return router;
    }

publi sh:

    @Bean
    public IntegrationFlow prepareForUpload(final Handler1 handler1) {
        BinaryFileSplitter binaryFileSplitter = new BinaryFileSplitter(true);
        binaryFileSplitter.setChunkSize(chunksize);

        return IntegrationFlows
                .from(aFlow)
                .handle(handler1)
                .split(binaryFileSplitter)
                .log(LoggingHandler.Level.TRACE, "com.my.log.identifyer")
                // Send message to the correct AMQP queue after successful processing
                .enrichHeaders(h -> h.header(QUEUE_ROUTING_KEY, AmqpConfiguration.Q4_NAME))
                .channel(MyChannels.AMQP_OUTPUT)
                .get();
    }
    @Bean
    public IntegrationFlow outputAmqpFlow(final AmqpTemplate amqpTemplate, final UpdateDb updateDb) {
        return IntegrationFlows.from(MyChannels.AMQP_OUTPUT)
                .log(LoggingHandler.Level.DEBUG, "com.my.log.identify")
                .handle(updateDb)
                .handle(Amqp.outboundAdapter(amqpTemplate)
                        .exchangeName(AmqpConfiguration.THE_TOPIC_EXCHANGE)
                        .routingKeyExpression("headers['queueRoutingKey']"))
                .get();
    }

получить:

    @Bean
    public IntegrationFlow handleReceivedMessages() {
        return IntegrationFlows
                .from(Q4_CHANNEL)
                .log(LoggingHandler.Level.DEBUG, "com.my.log.identifyer")
                .handle(..)
                .aggregate(a -> a.releaseStrategy(new ChunkReleaseStrategy()))
                .transform(..)
                ....(..)..
                ...

1 Ответ

0 голосов
/ 29 апреля 2020

Как обсуждалось в документации, на которую вы указали, вам нужно добавить BoundRabbitChannelAdvice к разделителю, чтобы весь нисходящий поток использовал один и тот же канал.

        @Bean
        public IntegrationFlow flow(RabbitTemplate template) {
            return IntegrationFlows.from(Gate.class)
                    .split(s -> s.delimiters(",")
                            .advice(new BoundRabbitChannelAdvice(template)))
                    .<String, String>transform(String::toUpperCase)
                    .handle(Amqp.outboundAdapter(template).routingKey("rk"))
                    .get();
        }
...