весенний интеграционный поток не устанавливает прослушиватель сообщений на receivecontainer - PullRequest
0 голосов
/ 25 мая 2018

Мы используем пружинную интеграцию (4.3.12) вместе с spring-amqp (1.7.4) для отправки и получения сообщений между микро сервисами.Чтобы не допускать интеграцию / настройку amqp из микро сервисов, мы хотим использовать библиотеку, содержащую фабрики интеграции / amqp для создания необходимых объектов.

Что я ожидаю:

Iсоздайте экземпляр IntegrationFlow с помощью метода messageHandler / messageHandler (см. код ниже) и SimpleMessageHandlerContainer.Когда я отправляю сообщение в связанный обмен, IO ожидает, что messageHandler будет вызван с сообщением.

Что я получу:

Исключение: «MessageDispatchingException: у Dispatcher нет подписчиков»

Если я использую MessageListenerContainer напрямую (установите messageHandler прямо в контейнере), я получаю сообщение, как и ожидалось.Я думаю, проблема заключается в программной инициализации интеграционного потока, но я не могу найти никакой информации о том, что я делаю неправильно.Кто-нибудь может дать мне подсказку?Спасибо

Теперь используется код:

public IntegrationFlow createMessageNotifierIntegrationFlow(//
        String brokerNameSpace, String messageHandlerNameSpace, //
        Object messageHandler, String methodName) {
    ConnectionFactory cf = createConnectionFactory(brokerNameSpace);
    AmqpAdmin amqpAdmin = createAmqpAdmin(brokerNameSpace, cf);
    Inbound inbound = createInbound(messageHandlerNameSpace);
    Queue queue = createQueue(messageHandlerNameSpace, amqpAdmin, inbound);

    MessageNotifierIntegrationFlowBuilder builder = MessageNotifierIntegrationFlowBuilder
            .newBuilder(messageHandlerNameSpace, this);
    IntegrationFlow integrationFlow = builder//
            .withConnectionFactory(cf)//
            .withMessageHandler(messageHandler)//
            .withMessageHandlerMethod(methodName)//
            .withFlowExceptionHandler(new FlowExceptionHandler())//
            .withInbound(inbound)//
            .withAmqpAdmin(amqpAdmin)//
            .withInboundQueue(queue)//
            .build();
    String beanName = brokerNameSpace + "-" + messageHandlerNameSpace + "-" + inbound.getQueue();

    return integrationFlow;
}

 public IntegrationFlow build() {
        LOGGER.info("creating IntegrationFlow for {}", inbound.getQueue());
        validateBuilder();
        SimpleMessageListenerContainer receiverContainer = amqpObjectFactory.createMessageListenerContainer(//
                inbound, connectionFactory, //
                flowExceptionHandler, inboundQueue, messageHandlerNameSpace);
        final AmqpInboundChannelAdapterSpec adapter = (AmqpInboundChannelAdapterSpec) Amqp.inboundAdapter(receiverContainer);

        StandardIntegrationFlow flow = IntegrationFlows //
                .from(adapter) //
                .log("receiveData")//
                .transform(TO_STRING_TRANSFORMER) //
                .handle(messageHandler, messageHandlerMethod) //
                .log("to message handler").get();
        // flow.start() maybe later?
        flow.start();
        return flow;
    }

       public SimpleMessageListenerContainer createMessageListenerContainer(//
            final Inbound inbound, //
            final ConnectionFactory connectionFactory, //
            final FlowExceptionHandler flowExceptionHandler, //
            final Queue inboundQueue, String messageHandlerNameSpace) {
        final String beanName = messageHandlerNameSpace + "-container-" + inbound.getQueue();
        SimpleMessageListenerContainer container = null;
        container = new SimpleMessageListenerContainer(connectionFactory);
        container.setMaxConcurrentConsumers(inbound.getMaxconsumers());
        container.setConcurrentConsumers(inbound.getMinconsumers());
        container.setStartConsumerMinInterval(inbound.getMininterval());
        container.addQueues(inboundQueue);
        container.setAcknowledgeMode(inbound.getAckmode());
        container.setDefaultRequeueRejected(inbound.getRequeueRejected());
        container.setErrorHandler(flowExceptionHandler);
        container.setRecoveryInterval(RECOVERY_INTERVAL);
        container.setAutoStartup(false);

        return container;
    }
...