используя http: inboundgateway с интеграцией Kafka Spring - PullRequest
0 голосов
/ 04 марта 2019

Я новичок в весенней интеграции Kafka и весеннем облачном потоке данных.поэтому моя проблема заключается в том, что я пытаюсь создать http-пост с обменом запросами / ответами, используя http inboundgateway

, чтобы получить следующий поток

:requestchannel > processor > replychannel

Мой текущий код выглядит так:

public class HttpSourceConfiguration {

    @Autowired
    private Source channels;

    @Autowired
    private HttpSourceProperties properties;

    @Bean
    public HttpRequestHandlingEndpointSupport httpSourceString() {
        return buildHttpRequestHandlerEndpointSpec("text/*", "application/json")
                .requestPayloadType(String.class)
                .get();
    }

    //@Bean
    public HttpRequestHandlingEndpointSupport httpSourceBytes() {
        return buildHttpRequestHandlerEndpointSpec("*/*")
                .get();
    }
    interface OutBandGateWayChannel {

        String REPLY = "reply";

        @Input(REPLY)
        SubscribableChannel reply();
    }
    private HttpRequestHandlerEndpointSpec buildHttpRequestHandlerEndpointSpec(final String... consumes) {
        return Http.inboundGateway(this.properties.getPathPattern()).
                mappedRequestHeaders(this.properties.getMappedRequestHeaders())
                .statusCodeExpression(new ValueExpression<>(HttpStatus.ACCEPTED))
                .requestMapping(requestMapping ->
                        requestMapping.methods(HttpMethod.POST)
                                .consumes(consumes))
                .crossOrigin(crossOrigin ->
                        crossOrigin.origin(this.properties.getCors().getAllowedOrigins())
                                .allowedHeaders(this.properties.getCors().getAllowedHeaders())
                                .allowCredentials(this.properties.getCors().getAllowCredentials()))

                .requestChannel(this.channels.output() )
                .replyChannel(OutBandGateWayChannel.REPLY)
                .replyTimeout(30000);

    }
    public static void main(String[] args){
        SpringApplication.run(HttpSourceConfiguration.class, args);
    }
}

Я получаю следующее исключение

org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available

вот трассировка стека исключений

2019-03-04 22:42:15.915 ERROR 10856 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.handler.BridgeHandler@1811b0b2]; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, failedMessage=GenericMessage [payload=byte[2], headers={kafka_offset=8, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@b73245f, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=reply, kafka_receivedTimestamp=1551724931910, contentType=application/json}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$400(KafkaMessageDrivenChannelAdapter.java:74)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1220)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1213)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1174)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1155)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:924)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:740)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
    ... 31 more

2019-03-04 22:42:15.916 ERROR 10856 --- [container-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = reply, partition = 0, offset = 8, CreateTime = 1551724931910, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@38ccb283)

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.handler.BridgeHandler@1811b0b2]; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$400(KafkaMessageDrivenChannelAdapter.java:74) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.3.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211) ~[spring-retry-1.2.3.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1220) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1213) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1174) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1155) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:924) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:740) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    ... 31 common frames omitted

2019-03-04 22:42:18.921 ERROR 10856 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.handler.BridgeHandler@1811b0b2]; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available, failedMessage=GenericMessage [payload=byte[2], headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@b73245f, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=reply, kafka_receivedTimestamp=1551724933340, contentType=application/json}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$400(KafkaMessageDrivenChannelAdapter.java:74)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1220)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1213)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1174)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1155)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:924)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:740)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
    ... 31 more

2019-03-04 22:42:18.921 ERROR 10856 --- [container-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = reply, partition = 0, offset = 9, CreateTime = 1551724933340, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@54aeb1b9)

org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.handler.BridgeHandler@1811b0b2]; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:369) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$400(KafkaMessageDrivenChannelAdapter.java:74) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:431) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:402) ~[spring-integration-kafka-3.1.0.RELEASE.jar:3.1.0.RELEASE]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.3.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211) ~[spring-retry-1.2.3.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1220) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1213) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1174) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1155) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:924) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:740) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    ... 31 common frames omitted

заранее спасибо:)

1 Ответ

0 голосов
/ 04 марта 2019

В Spring Cloud Stream нет поддержки запросов-ответов.Он предназначен для потоковой передачи .Вы можете объединить Source и Sink (не процессор) в одном приложении для отправки запросов и прослушивания ответов в какой-либо другой теме, но впоследствии вам нужно будет сопоставить запрос с ответами самостоятельно.

Что яможет помочь вам что-то вроде этого: https://github.com/spring-cloud/spring-cloud-stream/issues/815

...