Я получаю приведенное ниже исключение даже после добавления подписчика в мой канал, т.е. orderDeliveredChannel.
Я преобразую конфигурацию XML в Java DSL Spring Integration.
Ниже приведено исключение, которым я являюсьПолучение при запуске приложения.
2019-11-10 13:47:14.520 INFO 24598 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : GenericMessage [payload=study.pattern.integration.lab9.domain.Order@776f7ea6, headers={sequenceNumber=2, correlationId=4b4dbb88-3fe7-1fa9-81a6-6e438baabac2, id=faada063-924f-f9c9-35f0-9771158e6180, sequenceSize=3, timestamp=1573364834520}]
2019-11-10 13:47:14.522 ERROR 24598 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.orderDeliveredChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=study.pattern.integration.lab9.domain.Order@776f7ea6, headers={sequenceNumber=2, correlationId=4b4dbb88-3fe7-1fa9-81a6-6e438baabac2, id=faada063-924f-f9c9-35f0-9771158e6180, sequenceSize=3, timestamp=1573364834520}], failedMessage=GenericMessage [payload=study.pattern.integration.lab9.domain.Order@776f7ea6, headers={sequenceNumber=2, correlationId=4b4dbb88-3fe7-1fa9-81a6-6e438baabac2, id=faada063-924f-f9c9-35f0-9771158e6180, sequenceSize=3, timestamp=1573364834520}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:444)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:318)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:266)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:229)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:851)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.processMessageForGroup(AbstractCorrelatingMessageHandler.java:498)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:471)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:444)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:318)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:266)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:229)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:133)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:396)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:380)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:272)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=study.pattern.integration.lab9.domain.Order@776f7ea6, headers={sequenceNumber=2, correlationId=4b4dbb88-3fe7-1fa9-81a6-6e438baabac2, id=faada063-924f-f9c9-35f0-9771158e6180, sequenceSize=3, timestamp=1573364834520}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
... 48 more
Вот моя весенняя конфигурация интеграции
package study.pattern.integration.lab9.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.MessageHandler;
import lombok.extern.slf4j.Slf4j;
import study.pattern.integration.lab9.domain.ItemType;
import study.pattern.integration.lab9.domain.Order;
import study.pattern.integration.lab9.domain.OrderItem;
import study.pattern.integration.lab9.service.OrderDelivery;
import study.pattern.integration.lab9.service.OrderItemsProcessor;
@EnableIntegration
@Configuration
@IntegrationComponentScan
@Slf4j
public class IntegrationConfiguration {
@Autowired
OrderItemsProcessor processor;
@Autowired
OrderDelivery orderDelivery;
@Bean
@Qualifier("orderChannel")
public DirectChannel orderChannel() {
return MessageChannels.direct().get();
}
@Bean
@Qualifier("orderItemsChannel")
public DirectChannel orderItemsChannel() {
return MessageChannels.direct().get();
}
/**
*
*
* QueueChannel: Implements PollableChannel.
* There’s one endpoint connected to the channel, no subscribers. This communication is asynchronous;
* the receiver will retrieve the message through a different thread. How it works:
The producer sends the message to the channel.
The channel queues the message.
The consumer actively retrieves the message (active receiver).
* @return
*/
@Bean
@Qualifier("musicItemsChannel")
public QueueChannel musicItemsChannel() {
return MessageChannels.queue().get();
}
@Bean
@Qualifier("softwareItemsChannel")
public QueueChannel softwareItemsChannel() {
return MessageChannels.queue().get();
}
@Bean
@Qualifier("booksItemChannel")
public QueueChannel booksItemsChannel() {
return MessageChannels.queue().get();
}
@Bean
@Qualifier("orderItemsProcessed")
public DirectChannel orderItemsProcessedChannel() {
return MessageChannels.direct().get();
}
@Bean
@Qualifier("orderDelivered")
public DirectChannel orderDeliveredChannel() {
return MessageChannels.direct().get();
}
@Bean
@ServiceActivator(inputChannel = "myLogChannel")
public MessageHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler(LoggingHandler.Level.INFO.name());
loggingHandler.setLoggerName("logging");
return loggingHandler;
}
@Bean
IntegrationFlow processOrderFlow() {
return IntegrationFlows
.from(orderChannel())
.split(Order.class , a -> a.getOrderItems())
.channel(orderItemsChannel())
.wireTap(f -> f.handle(logger()))
.route(OrderItem.class,
o -> o.getType().name(),
type -> type.channelMapping(ItemType.BOOK.name(),booksItemsChannel())
.channelMapping(ItemType.MUSIC_CD.name(), musicItemsChannel())
.channelMapping(ItemType.SOFTWARE.name(), softwareItemsChannel())
)
.get();
}
@Bean
IntegrationFlow processBooksItemChannel() {
return IntegrationFlows.from(booksItemsChannel())
.handle(processor,"processBooksOrderItem",spec -> spec.poller(Pollers.fixedDelay(100l)))
.channel(orderItemsProcessedChannel())
// .wireTap(f -> f.handle(logger()))
.log()
.get();
}
@Bean
IntegrationFlow processMusicItemChannel() {
return IntegrationFlows.from(musicItemsChannel())
.handle(processor,"processMusicOrderItem",spec -> spec.poller(Pollers.fixedDelay(100l)))
.channel(orderItemsProcessedChannel())
// .wireTap(f -> f.handle(logger()))
.log()
.get();
}
@Bean
IntegrationFlow processSoftwareItemChannel() {
return IntegrationFlows.from(softwareItemsChannel())
.handle(processor, "processSoftware", spec -> spec.poller(Pollers.fixedDelay(100l)))
.channel(orderItemsProcessedChannel())
// .wireTap(f -> f.handle(logger()))
.log()
.get();
}
@Bean
IntegrationFlow aggreateAllProcessedOrderItems() {
return IntegrationFlows.from(orderItemsProcessedChannel())
.aggregate(spec -> spec.processor(orderDelivery, "delivery"))
.channel(orderDeliveredChannel())
.handle(m -> log.info("The Payload data {} ",m.getPayload()))
.log()
.get();
}
}
Я на последнем шаге, чтобы закончить настройку XML в Java DSL.
Может кто-нибудь, пожалуйста, помогите, как решить эту проблему.