Spring Cloud - у Dispatcher нет подписчиков на канал - PullRequest
0 голосов
/ 23 ноября 2018

Я создаю микросервис с использованием JHipster, и при попытке получить сообщение от kafka я сталкиваюсь со странной проблемой.Я использую 3 компьютера во внутренней локальной сети, чтобы сделать этот тест:

  • Сервер Kafka в 192.168.0.200
  • Микросервис шлюза в 192.168.0.7
  • Блог микросервиса в192.168.0.9

Когда я меняю пользователя в микросервисе шлюза, он может правильно отправить сообщение в kafka, и через секунду я получаю следующую ошибку в микросервисе блога:

2018-11-23 09:33:22.628 ERROR 21063 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'blog-1.userChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[188], headers={kafka_offset=3, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@33bfc8cb, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=user-data-change, kafka_receivedTimestamp=1542972799625}], failedMessage=GenericMessage [payload=byte[188], headers={kafka_offset=3, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@33bfc8cb, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=user-data-change, kafka_receivedTimestamp=1542972799625}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1077)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1057)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:999)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:867)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:725)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[188], headers={kafka_offset=3, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@33bfc8cb, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=user-data-change, kafka_receivedTimestamp=1542972799625}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
... 24 more

Это моя конфигурация kafka:

spring
    cloud:
        stream:
            kafka:
                binder:
                    brokers: 192.168.0.200
                    zk-nodes: 192.168.0.200
                bindings:
                    gatewayChannel:
                        consumer:
                            enableDlq: true
                            dlqName: unprocessed
                            autoCommitOnError: true
                            autoCommitOffset: true
                    userChannel:
                        consumer:
                            enableDlq: true
                            dlqName: unprocessed
                            autoCommitOnError: true
                            autoCommitOffset: true
            bindings:
                gatewayChannel:
                    group: blog
                    destination: schema
                    content-type: application/json
                userChannel:
                    group: blog
                    destination: user-data-change
                    content-type: application/json

И эти классы, которые потребляют сообщения:

@EnableBinding(value = {ConsumerChannel.class})
public class MessagingConfiguration {

    private final CustomLiquibaseService customLiquibaseService;

    public MessagingConfiguration(CustomLiquibaseService customLiquibaseService) {
        this.customLiquibaseService = customLiquibaseService;
    }

    @StreamListener("gatewayChannel")
    public void createSchema(DefaultMessage message) {
        customLiquibaseService.createSchema(message.getMessage());
    }

    @StreamListener("userChannel")
    public void processUser(Object message) throws Exception {
        throw new Exception("test");
    }
}

public interface ConsumerChannel {

    @Input
    SubscribableChannel gatewayChannel();

    @Input
    SubscribableChannel userChannel();
}

Как я, эта ошибка возникает, только когда микросервис шлюза добавляет сообщение в тему.Я подозреваю, что мой микросервис прослушивает, но не может обработать сообщение.Что я делаю не так?

...