MessageDispatchingException: у Dispatcher нет подписчиков, но сообщение по-прежнему отправляется через Kafka - PullRequest
0 голосов
/ 29 апреля 2020

У меня есть это исключение, когда я пытаюсь отправить сообщение из одного весеннего микросервиса в другой. Интересно, что сообщение успешно отправлено с одного микросервиса через Kafka на второй, но это исключение все еще происходит.

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application-1.topicInputChannel'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[106], headers={kafka_offset=37, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@344142c9, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, eventType=SubmitFormEvent, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=topic, kafka_receivedTimestamp=1588161866654}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70) ~[spring-integration-kafka-3.1.0.M1.jar:3.1.0.M1]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387) ~[spring-integration-kafka-3.1.0.M1.jar:3.1.0.M1]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364) ~[spring-integration-kafka-3.1.0.M1.jar:3.1.0.M1]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.3.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211) ~[spring-retry-1.2.3.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1220) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1213) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1174) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1155) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1096) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:924) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:740) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_242]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_242]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_242]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
    ... 26 common frames omitted

Моя конфигурация:

Слушатель:

@Component
public class Listener {

    @Override
    @StreamListener(condition = "headers['eventType'] == 'SubmitFormEvent' ", value = TOPIC_INPUT_CHANNEL)
    public void handle(SubmitFormEvent event) {
        ...// I succesfully entered here
    }
}

Основной класс получения микросервиса:

@SpringBootApplication
@EnableBinding(KafkaStreams.class)
public class CamundaApplication extends SpringBootProcessApplication {

  public static void main(String[] args) {
    SpringApplication.run(CamundaApplication.class);
  }
}

Основной класс отправки микросервиса:

@SpringBootApplication
@EnableBinding(KafkaStreams.class)
public class ClaiminiApplication {

  public static void main(String[] args) {
    SpringApplication.run(ClaiminiApplication.class);
  }
}

application.properties Файл обоих проектов содержит следующие параметры:

spring.cloud.stream.kafka.binder.brokers=kafka:9092
spring.cloud.stream.bindings.topicInputChannel.destination=topic
spring.cloud.stream.bindings.topicInputChannel.content-type=application/json
spring.cloud.stream.bindings.topicOutputChannel.destination=topic
spring.cloud.stream.bindings.topicOutputChannel.content-type=application/json

Класс KafkaStreams:

public interface KafkaStreams {
    String TOPIC_INPUT_CHANNEL = "topicInputChannel";
    String TOPIC_OUTPUT_CHANNEL = "topicOutputChannel";

    @Input(TOPIC_INPUT_CHANNEL)
    SubscribableChannel topicInputChannel();

    @Output(TOPIC_OUTPUT_CHANNEL)
    MessageChannel topicOutputChannel();

}
...