Откат транзакции в канале - PullRequest
0 голосов
/ 04 октября 2018

У меня есть поток Spring Integration, который опрашивает папку для файла и затем пытается отправить содержимое файла в тему Kafka.Опросчик настроен на использование менеджера транзакций (PseudoTransactionManager()), поэтому, если что-то идет не так, файл перемещается в папку «error».

Вот как выглядят потоки:

  @Bean
  public TransactionSynchronizationFactory transactionSynchronizationFactory() {
    ExpressionParser parser = new SpelExpressionParser();
    ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor =
        new ExpressionEvaluatingTransactionSynchronizationProcessor();
    syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
    syncProcessor.setAfterCommitExpression(parser
        .parseExpression("payload.renameTo(new java.io.File(@inboundProcessedDirectory.path " +
            " + T(java.io.File).separator + payload.name))"));
    syncProcessor.setAfterRollbackExpression(
        parser.parseExpression("payload.renameTo(new java.io.File(@inboundFailedDirectory.path " +
            " + T(java.io.File).separator + payload.name))"));
    return new DefaultTransactionSynchronizationFactory(syncProcessor);
  }


public IntegrationFlow inboundFileIntegration(MessageSource<File> fileReadingMessageSource) {

    c -> c.id("inboundFileAdapter")
            .poller(Pollers.fixedDelay(period)   
            .taskExecutor(taskExecutor) 
            .maxMessagesPerPoll(maxMessagesPerPoll)
              .transactionSynchronizationFactory(transactionSynchronizationFactory()) 
                .transactional(transactionManager())))  
        // END POLLER CONFIGURATION
        .transform(fileToObjectTransformer)         
        .channel(INBOUND_BBS_CHANNEL)
        .get();
}



  @Bean
  public IntegrationFlow toKafka(KafkaTemplate<?, ?> kafkaTemplate) {
    return IntegrationFlows.from(INBOUND_BBS_CHANNEL)
          .handle(Kafka.outboundChannelAdapter(kafkaTemplate)

              .topic(this.properties.getEntitlement().getTopic())
            .sync(true)
            .messageKey(this.properties.getEntitlement().getMessageKey()))
        .get();
  }

Поток отлично работает в «счастливом» сценарии.Но, например, если я уничтожу кластер Kafka, я бы хотел, чтобы ошибка распространялась назад, и у меня была transactionalManager, чтобы переместить файл в папку «error» (а также зарегистрировать проблему, используя wiretap).Вместо этого, когда возникает ошибка, связанная с Kafka, я получаю:

Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException:

, а затем:

Caused by: java.lang.IllegalArgumentException: Message payload must be an Expression instance or an expression String.

Файл, помещенный в исходную папку, не перемещается.Я предполагаю, что проблема связана с тем фактом, что «канал Кафки» является односторонним, поэтому исключение не распространяется обратно в исходный поток, где живет txManager.Как я могу заставить txManager «откатить» файл?

--- ОБНОВЛЕНИЕ ---

Полная трассировка стека

2018-10-04 17:46:32.332 ERROR 7300 --- [ taskExecutor-4] o.s.k.s.LoggingProducerListener          : Exception thrown when sending a message with key='bbs.key' and payload='{"messageKey": "98d34db2-28c4-4b24-a867-5f16c4e662f6", "entitlements": [{"accountId": "030", "applic...' to topic bbs.topic:

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.

2018-10-04 17:46:32.349  WARN 7300 --- [ad | producer-1] o.a.k.c.NetworkClient                    : [Producer clientId=producer-1] Connection to node -1 could not be established. Broker may not be available.
2018-10-04 17:46:32.348  INFO 7300 --- [ taskExecutor-4] ---- DONE ----                           : GenericMessage [payload={"messageKey": "98d34db2-28c4-4b24-a867-5f16c4e662f6", "entitlements": []
2018-10-04 17:46:32.364 ERROR 7300 --- [ taskExecutor-4] o.s.i.h.LoggingHandler                   : All attempts to deliver Message to MessageHandlers failed.; nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [toKafka.org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms., failedMessage=GenericMessage [payload={"messageKey": "98d34db2-28c4-4b24-a867-5f16c4e662f6", "entitlements": [{"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T005917"}, {"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T005917"}, {"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "45T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "46T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "46T", "environment": "T011302"}]}, headers={errorChannel=error222, file_originalFile=c:\UBS\Dev\tmp\cosima-producer\inbound\read\full-base-lca17_126.txt, id=b28bb489-b62e-6861-5c1b-c4665fd72e62, file_name=full-base-lca17_126.txt, file_relativePath=full-base-lca17_126.txt, timestamp=1538667991063}]. Multiple causes:
    error occurred in message handler [toKafka.org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.
    error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@180d04d9] (inboundFileIntegration.org.springframework.integration.config.ConsumerEndpointFactoryBean#3)]; nested exception is java.lang.IllegalArgumentException: Message payload must be an Expression instance or an expression String.
See below for the stacktrace of the first cause.org.springframework.messaging.MessageHandlingException: error occurred in message handler [toKafka.org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms., failedMessage=GenericMessage [payload={"messageKey": ]}, headers={errorChannel=error222, file_originalFile=c:\UBS\Dev\tmp\cosima-producer\inbound\read\full-base-lca17_126.txt, id=b28bb489-b62e-6861-5c1b-c4665fd72e62, file_name=full-base-lca17_126.txt, file_relativePath=full-base-lca17_126.txt, timestamp=1538667991063}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:144)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:227)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:290)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:197)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
    at com.sun.proxy.$Proxy93.call(Unknown Source)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.lambda$run$0(AbstractPollingEndpoint.java:391)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:206)
    at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:133)
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:290)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    ... 73 more
Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.
    at org.springframework.kafka.core.KafkaTemplate$1.onCompletion(KafkaTemplate.java:370)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:827)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:768)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:363)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:355)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:206)
    at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:257)
    ... 74 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 1000 ms.
org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.ExpressionCommandMessageProcessor@180d04d9] (inboundFileIntegration.org.springframework.integration.config.ConsumerEndpointFactoryBean#3)]; nested exception is java.lang.IllegalArgumentException: Message payload must be an Expression instance or an expression String., failedMessage=GenericMessage [payload={"messageKey": "98d34db2-28c4-4b24-a867-5f16c4e662f6", "entitlements": [{"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T005917"}, {"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T005917"}, {"accountId": "030", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "30T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "45T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "46T", "environment": "T011302"}, {"accountId": "046", "applicationId": "PAT", "accessRightType": "R2", "accessRight": "ALOE-SAP", "instanceCode": "46T", "environment": "T011302"}]}, headers={file_originalFile=c:\UBS\Dev\tmp\cosima-producer\inbound\read\full-base-lca17_126.txt, id=3ebb9873-e1c2-8ec3-b541-98106af7bf53, file_name=full-base-lca17_126.txt, file_relativePath=full-base-lca17_126.txt, timestamp=1538667991043}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:144)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:227)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:290)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:197)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:294)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
    at com.sun.proxy.$Proxy93.call(Unknown Source)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.lambda$run$0(AbstractPollingEndpoint.java:391)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Message payload must be an Expression instance or an expression String.
    at org.springframework.integration.handler.ExpressionCommandMessageProcessor.processMessage(ExpressionCommandMessageProcessor.java:78)
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    ... 73 more
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...