Я создаю микросервис с использованием JHipster, и при попытке получить сообщение от kafka я сталкиваюсь со странной проблемой.Я использую 3 компьютера во внутренней локальной сети, чтобы сделать этот тест:
- Сервер Kafka в 192.168.0.200
- Микросервис шлюза в 192.168.0.7
- Блог микросервиса в192.168.0.9
Когда я меняю пользователя в микросервисе шлюза, он может правильно отправить сообщение в kafka, и через секунду я получаю следующую ошибку в микросервисе блога:
2018-11-23 09:33:22.628 ERROR 21063 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'blog-1.userChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[188], headers={kafka_offset=3, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@33bfc8cb, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=user-data-change, kafka_receivedTimestamp=1542972799625}], failedMessage=GenericMessage [payload=byte[188], headers={kafka_offset=3, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@33bfc8cb, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=user-data-change, kafka_receivedTimestamp=1542972799625}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1077)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1057)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:999)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:867)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:725)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[188], headers={kafka_offset=3, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@33bfc8cb, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=user-data-change, kafka_receivedTimestamp=1542972799625}]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
... 24 more
Это моя конфигурация kafka:
spring
cloud:
stream:
kafka:
binder:
brokers: 192.168.0.200
zk-nodes: 192.168.0.200
bindings:
gatewayChannel:
consumer:
enableDlq: true
dlqName: unprocessed
autoCommitOnError: true
autoCommitOffset: true
userChannel:
consumer:
enableDlq: true
dlqName: unprocessed
autoCommitOnError: true
autoCommitOffset: true
bindings:
gatewayChannel:
group: blog
destination: schema
content-type: application/json
userChannel:
group: blog
destination: user-data-change
content-type: application/json
И эти классы, которые потребляют сообщения:
@EnableBinding(value = {ConsumerChannel.class})
public class MessagingConfiguration {
private final CustomLiquibaseService customLiquibaseService;
public MessagingConfiguration(CustomLiquibaseService customLiquibaseService) {
this.customLiquibaseService = customLiquibaseService;
}
@StreamListener("gatewayChannel")
public void createSchema(DefaultMessage message) {
customLiquibaseService.createSchema(message.getMessage());
}
@StreamListener("userChannel")
public void processUser(Object message) throws Exception {
throw new Exception("test");
}
}
public interface ConsumerChannel {
@Input
SubscribableChannel gatewayChannel();
@Input
SubscribableChannel userChannel();
}
Как я, эта ошибка возникает, только когда микросервис шлюза добавляет сообщение в тему.Я подозреваю, что мой микросервис прослушивает, но не может обработать сообщение.Что я делаю не так?