У меня есть поток Spring Integration, который опрашивает папку для файла и затем пытается отправить содержимое файла в тему Kafka.Опросчик настроен на использование менеджера транзакций (PseudoTransactionManager()
), поэтому, если что-то идет не так, файл перемещается в папку «error».
Вот как выглядят потоки:
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionParser parser = new SpelExpressionParser();
ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor =
new ExpressionEvaluatingTransactionSynchronizationProcessor();
syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
syncProcessor.setAfterCommitExpression(parser
.parseExpression("payload.renameTo(new java.io.File(@inboundProcessedDirectory.path " +
" + T(java.io.File).separator + payload.name))"));
syncProcessor.setAfterRollbackExpression(
parser.parseExpression("payload.renameTo(new java.io.File(@inboundFailedDirectory.path " +
" + T(java.io.File).separator + payload.name))"));
return new DefaultTransactionSynchronizationFactory(syncProcessor);
}
public IntegrationFlow inboundFileIntegration(MessageSource<File> fileReadingMessageSource) {
c -> c.id("inboundFileAdapter")
.poller(Pollers.fixedDelay(period)
.taskExecutor(taskExecutor)
.maxMessagesPerPoll(maxMessagesPerPoll)
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.transactional(transactionManager())))
// END POLLER CONFIGURATION
.transform(fileToObjectTransformer)
.channel(INBOUND_BBS_CHANNEL)
.get();
}
@Bean
public IntegrationFlow toKafka(KafkaTemplate<?, ?> kafkaTemplate) {
return IntegrationFlows.from(INBOUND_BBS_CHANNEL)
.handle(Kafka.outboundChannelAdapter(kafkaTemplate)
.topic(this.properties.getEntitlement().getTopic())
.sync(true)
.messageKey(this.properties.getEntitlement().getMessageKey()))
.get();
}
Поток отлично работает в «счастливом» сценарии.Но, например, если я уничтожу кластер Kafka, я бы хотел, чтобы ошибка распространялась назад, и у меня была transactionalManager
, чтобы переместить файл в папку «error» (а также зарегистрировать проблему, используя wiretap
).Вместо этого, когда возникает ошибка, связанная с Kafka, я получаю:
Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException:
, а затем:
Caused by: java.lang.IllegalArgumentException: Message payload must be an Expression instance or an expression String.
Файл, помещенный в исходную папку, не перемещается.Я предполагаю, что проблема связана с тем фактом, что «канал Кафки» является односторонним, поэтому исключение не распространяется обратно в исходный поток, где живет txManager.Как я могу заставить txManager «откатить» файл?
--- ОБНОВЛЕНИЕ ---
Полная трассировка стека
2018-10-04 17:46:32.332 ERROR 7300 --- [ taskExecutor-4] o.s.k.s.LoggingProducerListener : Exception thrown when sending a message with key='bbs.key' and payload='{"messageKey": "98d34db2-28c4-4b24-a867-5f16c4e662f6", "entitlements": [{"accountId": "030", "applic...' to topic bbs.topic:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.
2018-10-04 17:46:32.349 WARN 7300 --- [ad | producer-1] o.a.k.c.NetworkClient : [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
2018-10-04 17:46:32.348 INFO 7300 --- [ taskExecutor-4] ---- DONE ---- : GenericMessage [payload={"messageKey": "98d34db2-28c4-4b24-a867-5f16c4e662f6", "entitlements": []
2018-10-04 17:46:32.364 ERROR 7300 --- [ taskExecutor-4] o.s.i.h.LoggingHandler : All attempts to deliver Message to MessageHandlers failed.; nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [toKafka.org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms., failedMessage=GenericMessage [payload={"messageKey": "98d34db2-28c4-4b24-a867-5f16c4e662f6", "entitlements": [{"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T005917"}, {"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T005917"}, {"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "45T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "46T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "46T", "environment": "T011302"}]}, headers={errorChannel=error222, file_originalFile=c:\UBS\Dev\tmp\cosima-producer\inbound\read\full-base-lca17_126.txt, id=b28bb489-b62e-6861-5c1b-c4665fd72e62, file_name=full-base-lca17_126.txt, file_relativePath=full-base-lca17_126.txt, timestamp=1538667991063}]. Multiple causes:
error occurred in message handler [toKafka.org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.
error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@180d04d9] (inboundFileIntegration.org.springframework.integration.config.ConsumerEndpointFactoryBean#3)]; nested exception is java.lang.IllegalArgumentException: Message payload must be an Expression instance or an expression String.
See below for the stacktrace of the first cause.org.springframework.messaging.MessageHandlingException: error occurred in message handler [toKafka.org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms., failedMessage=GenericMessage [payload={"messageKey": ]}, headers={errorChannel=error222, file_originalFile=c:\UBS\Dev\tmp\cosima-producer\inbound\read\full-base-lca17_126.txt, id=b28bb489-b62e-6861-5c1b-c4665fd72e62, file_name=full-base-lca17_126.txt, file_relativePath=full-base-lca17_126.txt, timestamp=1538667991063}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:144)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:227)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:290)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:197)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy93.call(Unknown Source)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.lambda$run$0(AbstractPollingEndpoint.java:391)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:206)
at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:133)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:290)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
... 73 more
Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.
at org.springframework.kafka.core.KafkaTemplate$1.onCompletion(KafkaTemplate.java:370)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:827)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:768)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:363)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:355)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:206)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:257)
... 74 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.
org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@180d04d9] (inboundFileIntegration.org.springframework.integration.config.ConsumerEndpointFactoryBean#3)]; nested exception is java.lang.IllegalArgumentException: Message payload must be an Expression instance or an expression String., failedMessage=GenericMessage [payload={"messageKey": "98d34db2-28c4-4b24-a867-5f16c4e662f6", "entitlements": [{"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T005917"}, {"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T005917"}, {"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "45T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "46T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "46T", "environment": "T011302"}]}, headers={file_originalFile=c:\UBS\Dev\tmp\cosima-producer\inbound\read\full-base-lca17_126.txt, id=3ebb9873-e1c2-8ec3-b541-98106af7bf53, file_name=full-base-lca17_126.txt, file_relativePath=full-base-lca17_126.txt, timestamp=1538667991043}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:144)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:227)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:290)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:197)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
at com.sun.proxy.$Proxy93.call(Unknown Source)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.lambda$run$0(AbstractPollingEndpoint.java:391)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Message payload must be an Expression instance or an expression String.
at org.springframework.integration.handler.ExpressionCommandMessageProcessor.processMessage(ExpressionCommandMessageProcessor.java:78)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
... 73 more