У меня есть это исключение, когда я пытаюсь отправить сообщение из одного весеннего микросервиса в другой. Интересно, что сообщение успешно отправлено с одного микросервиса через Kafka
на второй, но это исключение все еще происходит.
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.topicInputChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[106], headers={kafka_offset=37, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@344142c9, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, eventType=SubmitFormEvent, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=topic, kafka_receivedTimestamp=1588161866654}]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70) ~[spring-integration-kafka-3.1.0.M1.jar:3.1.0.M1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387) ~[spring-integration-kafka-3.1.0.M1.jar:3.1.0.M1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364) ~[spring-integration-kafka-3.1.0.M1.jar:3.1.0.M1]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.3.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211) ~[spring-retry-1.2.3.RELEASE.jar:na]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1220) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1213) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1174) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1155) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:924) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:740) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_242]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_242]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_242]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
... 26 common frames omitted
Моя конфигурация:
Слушатель:
@Component
public class Listener {
@Override
@StreamListener(condition = "headers['eventType'] == 'SubmitFormEvent' ", value = TOPIC_INPUT_CHANNEL)
public void handle(SubmitFormEvent event) {
...// I succesfully entered here
}
}
Основной класс получения микросервиса:
@SpringBootApplication
@EnableBinding(KafkaStreams.class)
public class CamundaApplication extends SpringBootProcessApplication {
public static void main(String[] args) {
SpringApplication.run(CamundaApplication.class);
}
}
Основной класс отправки микросервиса:
@SpringBootApplication
@EnableBinding(KafkaStreams.class)
public class ClaiminiApplication {
public static void main(String[] args) {
SpringApplication.run(ClaiminiApplication.class);
}
}
application.properties
Файл обоих проектов содержит следующие параметры:
spring.cloud.stream.kafka.binder.brokers=kafka:9092
spring.cloud.stream.bindings.topicInputChannel.destination=topic
spring.cloud.stream.bindings.topicInputChannel.content-type=application/json
spring.cloud.stream.bindings.topicOutputChannel.destination=topic
spring.cloud.stream.bindings.topicOutputChannel.content-type=application/json
Класс KafkaStreams:
public interface KafkaStreams {
String TOPIC_INPUT_CHANNEL = "topicInputChannel";
String TOPIC_OUTPUT_CHANNEL = "topicOutputChannel";
@Input(TOPIC_INPUT_CHANNEL)
SubscribableChannel topicInputChannel();
@Output(TOPIC_OUTPUT_CHANNEL)
MessageChannel topicOutputChannel();
}