Весенняя интеграция java DSL Dispatcher не имеет подписчиков - PullRequest
1 голос
/ 10 ноября 2019

Я получаю приведенное ниже исключение даже после добавления подписчика в мой канал, т.е. orderDeliveredChannel.

Я преобразую конфигурацию XML в Java DSL Spring Integration.

Ниже приведено исключение, которым я являюсьПолучение при запуске приложения.

2019-11-10 13:47:14.520  INFO 24598 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=study.pattern.integration.lab9.domain.Order@776f7ea6, headers={sequenceNumber=2, correlationId=4b4dbb88-3fe7-1fa9-81a6-6e438baabac2, id=faada063-924f-f9c9-35f0-9771158e6180, sequenceSize=3, timestamp=1573364834520}]
2019-11-10 13:47:14.522 ERROR 24598 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.orderDeliveredChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=study.pattern.integration.lab9.domain.Order@776f7ea6, headers={sequenceNumber=2, correlationId=4b4dbb88-3fe7-1fa9-81a6-6e438baabac2, id=faada063-924f-f9c9-35f0-9771158e6180, sequenceSize=3, timestamp=1573364834520}], failedMessage=GenericMessage [payload=study.pattern.integration.lab9.domain.Order@776f7ea6, headers={sequenceNumber=2, correlationId=4b4dbb88-3fe7-1fa9-81a6-6e438baabac2, id=faada063-924f-f9c9-35f0-9771158e6180, sequenceSize=3, timestamp=1573364834520}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:444)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:318)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:266)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:229)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:851)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.processMessageForGroup(AbstractCorrelatingMessageHandler.java:498)
    at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:471)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:444)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:318)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:266)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:229)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:133)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
    at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:143)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:396)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:380)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:328)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:275)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:272)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=study.pattern.integration.lab9.domain.Order@776f7ea6, headers={sequenceNumber=2, correlationId=4b4dbb88-3fe7-1fa9-81a6-6e438baabac2, id=faada063-924f-f9c9-35f0-9771158e6180, sequenceSize=3, timestamp=1573364834520}]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    ... 48 more

Вот моя весенняя конфигурация интеграции

package study.pattern.integration.lab9.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.MessageHandler;

import lombok.extern.slf4j.Slf4j;
import study.pattern.integration.lab9.domain.ItemType;
import study.pattern.integration.lab9.domain.Order;
import study.pattern.integration.lab9.domain.OrderItem;
import study.pattern.integration.lab9.service.OrderDelivery;
import study.pattern.integration.lab9.service.OrderItemsProcessor;

@EnableIntegration
@Configuration
@IntegrationComponentScan
@Slf4j
public class IntegrationConfiguration {

    @Autowired
    OrderItemsProcessor processor;

    @Autowired
    OrderDelivery orderDelivery;

    @Bean
    @Qualifier("orderChannel")
    public DirectChannel orderChannel() {
        return MessageChannels.direct().get();
    }   

    @Bean
    @Qualifier("orderItemsChannel")
    public DirectChannel orderItemsChannel() {
        return MessageChannels.direct().get();
    }

    /**
     * 
     * 
     * QueueChannel: Implements PollableChannel. 
     * There’s one endpoint connected to the channel, no subscribers. This communication is asynchronous; 
     * the receiver will retrieve the message through a different thread. How it works:
        The producer sends the message to the channel.
        The channel queues the message.
        The consumer actively retrieves the message (active receiver).
     * @return
     */


    @Bean
    @Qualifier("musicItemsChannel")
    public QueueChannel musicItemsChannel() {
        return MessageChannels.queue().get();
    }

    @Bean
    @Qualifier("softwareItemsChannel")
    public QueueChannel softwareItemsChannel() {
        return MessageChannels.queue().get();
    }

    @Bean
    @Qualifier("booksItemChannel")
    public QueueChannel booksItemsChannel() {
        return MessageChannels.queue().get();
    }

    @Bean
    @Qualifier("orderItemsProcessed")
    public DirectChannel orderItemsProcessedChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    @Qualifier("orderDelivered")
    public DirectChannel orderDeliveredChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    @ServiceActivator(inputChannel = "myLogChannel")
    public MessageHandler logger() {
         LoggingHandler loggingHandler =  new LoggingHandler(LoggingHandler.Level.INFO.name());
         loggingHandler.setLoggerName("logging");
         return loggingHandler;
    }


    @Bean
    IntegrationFlow processOrderFlow() {
        return IntegrationFlows
                .from(orderChannel())
                    .split(Order.class , a -> a.getOrderItems())
                    .channel(orderItemsChannel())
                    .wireTap(f -> f.handle(logger()))
                    .route(OrderItem.class,
                            o -> o.getType().name(),
                            type -> type.channelMapping(ItemType.BOOK.name(),booksItemsChannel())
                            .channelMapping(ItemType.MUSIC_CD.name(), musicItemsChannel())
                            .channelMapping(ItemType.SOFTWARE.name(), softwareItemsChannel())
                            )
        .get();
    }

    @Bean   
    IntegrationFlow processBooksItemChannel() {
        return IntegrationFlows.from(booksItemsChannel())
            .handle(processor,"processBooksOrderItem",spec -> spec.poller(Pollers.fixedDelay(100l)))
            .channel(orderItemsProcessedChannel())
//          .wireTap(f -> f.handle(logger()))
            .log()
            .get();

    }

    @Bean
    IntegrationFlow processMusicItemChannel() {
        return IntegrationFlows.from(musicItemsChannel())
            .handle(processor,"processMusicOrderItem",spec -> spec.poller(Pollers.fixedDelay(100l)))
            .channel(orderItemsProcessedChannel())
//          .wireTap(f -> f.handle(logger()))
            .log()
            .get();

    }

    @Bean
    IntegrationFlow processSoftwareItemChannel() {
        return IntegrationFlows.from(softwareItemsChannel())
            .handle(processor, "processSoftware", spec -> spec.poller(Pollers.fixedDelay(100l)))
            .channel(orderItemsProcessedChannel())
//          .wireTap(f -> f.handle(logger()))
            .log()
            .get();
    }

    @Bean
    IntegrationFlow aggreateAllProcessedOrderItems() {
        return IntegrationFlows.from(orderItemsProcessedChannel())
                .aggregate(spec -> spec.processor(orderDelivery, "delivery")) 
                .channel(orderDeliveredChannel())
                .handle(m -> log.info("The Payload data {} ",m.getPayload())) 
                .log()
                .get();
    }
}

Я на последнем шаге, чтобы закончить настройку XML в Java DSL.

Может кто-нибудь, пожалуйста, помогите, как решить эту проблему.

1 Ответ

0 голосов
/ 11 ноября 2019

Ваша проблема здесь:

            .handle(m -> log.info("The Payload data {} ",m.getPayload())) 
            .log()

Вы просто не можете использовать перехватчик log, когда больше нет канала. Я говорю это потому, что handle() является односторонним компонентом, и нет ничего для отправки в качестве ответа, и, следовательно, для этого перехвата log не будет создан выходной канал.

Непонятно, какая у вас версия Spring Integration, но в текущей версии у нас есть защита от этой неправильной конфигурации:

                throw new BeanCreationException("The 'currentComponent' (" + currComponent +
                        ") is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. " +
                        "This is the end of the integration flow.");

Поэтому ваша ошибка будет немедленно отклоненафаза разбора!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...