Мы используем пружинную интеграцию (4.3.12) вместе с spring-amqp (1.7.4) для отправки и получения сообщений между микро сервисами.Чтобы не допускать интеграцию / настройку amqp из микро сервисов, мы хотим использовать библиотеку, содержащую фабрики интеграции / amqp для создания необходимых объектов.
Что я ожидаю:
Iсоздайте экземпляр IntegrationFlow с помощью метода messageHandler / messageHandler (см. код ниже) и SimpleMessageHandlerContainer.Когда я отправляю сообщение в связанный обмен, IO ожидает, что messageHandler будет вызван с сообщением.
Что я получу:
Исключение: «MessageDispatchingException: у Dispatcher нет подписчиков»
Если я использую MessageListenerContainer напрямую (установите messageHandler прямо в контейнере), я получаю сообщение, как и ожидалось.Я думаю, проблема заключается в программной инициализации интеграционного потока, но я не могу найти никакой информации о том, что я делаю неправильно.Кто-нибудь может дать мне подсказку?Спасибо
Теперь используется код:
public IntegrationFlow createMessageNotifierIntegrationFlow(//
String brokerNameSpace, String messageHandlerNameSpace, //
Object messageHandler, String methodName) {
ConnectionFactory cf = createConnectionFactory(brokerNameSpace);
AmqpAdmin amqpAdmin = createAmqpAdmin(brokerNameSpace, cf);
Inbound inbound = createInbound(messageHandlerNameSpace);
Queue queue = createQueue(messageHandlerNameSpace, amqpAdmin, inbound);
MessageNotifierIntegrationFlowBuilder builder = MessageNotifierIntegrationFlowBuilder
.newBuilder(messageHandlerNameSpace, this);
IntegrationFlow integrationFlow = builder//
.withConnectionFactory(cf)//
.withMessageHandler(messageHandler)//
.withMessageHandlerMethod(methodName)//
.withFlowExceptionHandler(new FlowExceptionHandler())//
.withInbound(inbound)//
.withAmqpAdmin(amqpAdmin)//
.withInboundQueue(queue)//
.build();
String beanName = brokerNameSpace + "-" + messageHandlerNameSpace + "-" + inbound.getQueue();
return integrationFlow;
}
public IntegrationFlow build() {
LOGGER.info("creating IntegrationFlow for {}", inbound.getQueue());
validateBuilder();
SimpleMessageListenerContainer receiverContainer = amqpObjectFactory.createMessageListenerContainer(//
inbound, connectionFactory, //
flowExceptionHandler, inboundQueue, messageHandlerNameSpace);
final AmqpInboundChannelAdapterSpec adapter = (AmqpInboundChannelAdapterSpec) Amqp.inboundAdapter(receiverContainer);
StandardIntegrationFlow flow = IntegrationFlows //
.from(adapter) //
.log("receiveData")//
.transform(TO_STRING_TRANSFORMER) //
.handle(messageHandler, messageHandlerMethod) //
.log("to message handler").get();
// flow.start() maybe later?
flow.start();
return flow;
}
public SimpleMessageListenerContainer createMessageListenerContainer(//
final Inbound inbound, //
final ConnectionFactory connectionFactory, //
final FlowExceptionHandler flowExceptionHandler, //
final Queue inboundQueue, String messageHandlerNameSpace) {
final String beanName = messageHandlerNameSpace + "-container-" + inbound.getQueue();
SimpleMessageListenerContainer container = null;
container = new SimpleMessageListenerContainer(connectionFactory);
container.setMaxConcurrentConsumers(inbound.getMaxconsumers());
container.setConcurrentConsumers(inbound.getMinconsumers());
container.setStartConsumerMinInterval(inbound.getMininterval());
container.addQueues(inboundQueue);
container.setAcknowledgeMode(inbound.getAckmode());
container.setDefaultRequeueRejected(inbound.getRequeueRejected());
container.setErrorHandler(flowExceptionHandler);
container.setRecoveryInterval(RECOVERY_INTERVAL);
container.setAutoStartup(false);
return container;
}