Я новичок в весенней интеграции Kafka и весеннем облачном потоке данных.поэтому моя проблема заключается в том, что я пытаюсь создать http-пост с обменом запросами / ответами, используя http inboundgateway
, чтобы получить следующий поток
:requestchannel > processor > replychannel
Мой текущий код выглядит так:
public class HttpSourceConfiguration {
@Autowired
private Source channels;
@Autowired
private HttpSourceProperties properties;
@Bean
public HttpRequestHandlingEndpointSupport httpSourceString() {
return buildHttpRequestHandlerEndpointSpec("text/*", "application/json")
.requestPayloadType(String.class)
.get();
}
//@Bean
public HttpRequestHandlingEndpointSupport httpSourceBytes() {
return buildHttpRequestHandlerEndpointSpec("*/*")
.get();
}
interface OutBandGateWayChannel {
String REPLY = "reply";
@Input(REPLY)
SubscribableChannel reply();
}
private HttpRequestHandlerEndpointSpec buildHttpRequestHandlerEndpointSpec(final String... consumes) {
return Http.inboundGateway(this.properties.getPathPattern()).
mappedRequestHeaders(this.properties.getMappedRequestHeaders())
.statusCodeExpression(new ValueExpression<>(HttpStatus.ACCEPTED))
.requestMapping(requestMapping ->
requestMapping.methods(HttpMethod.POST)
.consumes(consumes))
.crossOrigin(crossOrigin ->
crossOrigin.origin(this.properties.getCors().getAllowedOrigins())
.allowedHeaders(this.properties.getCors().getAllowedHeaders())
.allowCredentials(this.properties.getCors().getAllowCredentials()))
.requestChannel(this.channels.output() )
.replyChannel(OutBandGateWayChannel.REPLY)
.replyTimeout(30000);
}
public static void main(String[] args){
SpringApplication.run(HttpSourceConfiguration.class, args);
}
}
Я получаю следующее исключение
org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
вот трассировка стека исключений
2019-03-04 22:42:15.915 ERROR 10856 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.handler.BridgeHandler@1811b0b2]; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, failedMessage=GenericMessage [payload=byte[2], headers={kafka_offset=8, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@b73245f, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=reply, kafka_receivedTimestamp=1551724931910, contentType=application/json}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$400(KafkaMessageDrivenChannelAdapter.java:74)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1220)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1213)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1174)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1155)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:924)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:740)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
... 31 more
2019-03-04 22:42:15.916 ERROR 10856 --- [container-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = reply, partition = 0, offset = 8, CreateTime = 1551724931910, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@38ccb283)
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.handler.BridgeHandler@1811b0b2]; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$400(KafkaMessageDrivenChannelAdapter.java:74) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.3.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211) ~[spring-retry-1.2.3.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1220) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1213) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1174) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1155) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:924) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:740) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
... 31 common frames omitted
2019-03-04 22:42:18.921 ERROR 10856 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.handler.BridgeHandler@1811b0b2]; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, failedMessage=GenericMessage [payload=byte[2], headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@b73245f, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=reply, kafka_receivedTimestamp=1551724933340, contentType=application/json}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$400(KafkaMessageDrivenChannelAdapter.java:74)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1220)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1213)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1174)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1155)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:924)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:740)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
... 31 more
2019-03-04 22:42:18.921 ERROR 10856 --- [container-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = reply, partition = 0, offset = 9, CreateTime = 1551724933340, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@54aeb1b9)
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.handler.BridgeHandler@1811b0b2]; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$400(KafkaMessageDrivenChannelAdapter.java:74) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.3.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211) ~[spring-retry-1.2.3.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1220) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1213) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1174) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1155) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:924) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:740) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
... 31 common frames omitted
заранее спасибо:)