Потребление сообщений больше не работает после обновления до Spring Integration 5 - PullRequest
0 голосов
/ 30 апреля 2019

Я пытаюсь обновить проект, используя Spring Integration 4.3 и Spring Boot 1.6, до Spring Integration 5.1 и Spring Boot 2.1. Ранее у меня была следующая конфигурация:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("myId")
                    .autoStartup(autoStartup)
                    .prefetchCount(10)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> a.correlationExpression("payload.entityId")
                                    .releaseExpression("size() eq iterator().next().payload.batchSize")
                                    .sendPartialResultOnExpiry(true)
                                    .groupTimeout(2000)
                                    .expireGroupsUponCompletion(true)
                                    .outputProcessor(myMessageGroupProcessor))
                    .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                    .get();

В процессе обновления я пытался следовать документации здесь и таким образом изменил конфигурацию на такую:

@Configuration
@EnableAutoConfiguration
@EnableIntegration
public class SpringConfig {

    @Bean(name = "myFlowId")
    public IntegrationFlow myFlow(ConnectionFactory connectionFactory, ServiceActivatorBean serviceActivatorBean,
                                  @Value("${spring.integration.flow.auto-startup:true}") boolean autoStartup,
                                  MyMessageGroupProcessor myMessageGroupProcessor) {
        IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                        .id("myId")
                        .autoStartup(autoStartup)
                        .configureContainer(c -> c.acknowledgeMode(MANUAL)
                            .prefetchCount(10)
                            .concurrentConsumers(2)
                            .maxConcurrentConsumers(3)
                        )
                        .messageConverter(messageConverter()))
                        .aggregate(a -> a.correlationExpression("payload.entityId")
                                        .releaseExpression("size() eq one().payload.batchSize")
                                        .sendPartialResultOnExpiry(true)
                                        .groupTimeout(2000)
                                        .expireGroupsUponCompletion(true)
                                        .outputProcessor(myMessageGroupProcessor))
                        .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                        .get();
    }
}

Но когда я публикую сообщения, не похоже, что они получены / обработаны потоком интеграции. Я не получаю журналы ошибок (или вообще никаких журналов по этому вопросу, даже если я включаю ведение журнала отладки), и я не совсем уверен, с чего начать отладку. Я уверен, что сообщения на самом деле публикуются в RabbitMQ, так что это не проблема. Чего мне не хватать?

1 Ответ

2 голосов
/ 30 апреля 2019

Моя проблема была на самом деле не в Spring Integration, а в изменениях Spring AMQP.Ранее «описатели» могли создаваться так:

@Bean
List<Binding> myBinding() {
    return List.of(<binding1>, <binding2>, ..)
}

, но в Spring AMQP 2.1 это значение должно быть изменено на:

@Bean
Declarables myBinding() {
    return new Declarables(List.of(<binding1>, <binding2>, ..))
}

См. Документацию здесь .

Кстати, мой releaseExpression тоже был не прав, должно быть size() eq one.payload.batchSize.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...