Диспетчер не смог доставить сообщение (Spring Integration) - PullRequest
0 голосов
/ 10 сентября 2018

Я разрабатываю свое первое потоковое приложение со Spring Integration Вкратце, моя цель - получить файлы json из tar.gz. Я закончил, и все работает хорошо, но в конце шлюз отправляет исключение вместо завершения.

Контекст таков:

<int:poller default="true" fixed-delay="0" />

<int:gateway id="extractedFolderProcess" default-reply-channel="endFlow"
    default-request-channel="inputChannelDatasetPath"
    error-channel="error_channel"
    service-interface="com.javarticles.spring.integration.utilities.ExtractedFolderProcess">
</int:gateway>

<int:channel id="error_channel"/>

<int:filter id="nullMessage" input-channel="error_channel" ref="errorFlow" />


<int:channel id="inputChannelDatasetPath"/>

<int:channel id="endFlow" />

<int:channel id="filesIn" >
    <int:queue />
</int:channel>

<int:channel id="jsonChannel">
    <int:queue />
</int:channel>

<int:channel id="extractedJsonChannel">
    <int:queue />
</int:channel>

<task:executor id="executor" pool-size="2" />

<int:splitter id="unzipSplitter" input-channel="inputChannelDatasetPath"  ref="unziperDataset" output-channel="filesIn"/>

<int:splitter id="testSplitter" input-channel="filesIn" ref="jsonSplitter" output-channel="jsonChannel">
    <int:poller task-executor="executor" fixed-delay="1000" /> 
</int:splitter>

<int:aggregator input-channel="jsonChannel" ref="jsonAggregator" output-channel="extractedJsonChannel" method="add"/>

<int:aggregator input-channel="extractedJsonChannel" 
ref="sumAggregator" output-channel="endFlow"    
method="add"
release-strategy="releaseStrategySumAggregator" 
correlation-strategy="correlationIdStrategySumAggregator" />

<bean id="errorFlow" class="com.javarticles.spring.integration.endPoints.ErrorFlow" />
<bean id="releaseStrategySumAggregator" class="com.javarticles.spring.integration.utilities.ReleaseStrategySumAggregator" />
<bean id="correlationIdStrategySumAggregator" class="com.javarticles.spring.integration.utilities.CorrelationIdStrategySumAggregator" />
<bean id="jsonSplitter" class="com.javarticles.spring.integration.endPoints.JsonSplitter" />
<bean id="unziperDataset" class="com.javarticles.spring.integration.endPoints.UnzipDataset" />
<bean id="jsonAggregator" class="com.javarticles.spring.integration.endPoints.JsonAggregator"/>
<bean id="sumAggregator" class="com.javarticles.spring.integration.endPoints.SumAggregator" />

и stackstrace показывает это:

2018-09-10 14:28:36 ERROR LoggingHandler:192 - org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, failedMessage=GenericMessage [payload=15878, headers={sequenceNumber=5875, correlationId=0, check=0, id=06f00498-e3ea-2dd0-e5e0-5663974e0e45, sequenceSize=5875, timestamp=1536582516348}]
at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.completeGroup(AbstractCorrelatingMessageHandler.java:671)
at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:418)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:129)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:287)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 33 more

Возможно, сообщение, отправленное вторым Агрегатором, имеет неправильные заголовки, но я не уверен. Как я могу решить?

1 Ответ

0 голосов
/ 10 сентября 2018

Есть несколько проблем с вашей конфигурацией. Ваши filesIn, jsonChannel, extractedJsonChannel - все каналы, которые можно опрашивать, однако единственный потребитель опроса, которого я вижу, это testSplitter. Так что я не очень понимаю, как кто-либо из агрегаторов получает сообщения.

Я бы предложил изменить эти каналы на DirectChannels (удалить <queue>) в качестве отправной точки.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...