Исключение: org.springframework.messaging.MessageDeliveryException: у Dispatcher нет подписчиков на канал - PullRequest
2 голосов
/ 15 января 2020

У меня есть песочница для исследования новых добавленных функций в Spring Cloud Stream, но я столкнулся с проблемой использования Function и Supplier в одном приложении Spring Cloud Stream.

В коде я использовал примеры, описанные в * документы * 1004.

Сначала я добавил в проект Function<String, String> с соответствующими свойствами spring.cloud.stream.bindings и spring.cloud.stream.function.definition в application.yml. Все работает нормально, я отправляю сообщение на my-fun-in Kafka topi c, приложение выполняет функцию и отправляет результат на my-fun-out topi c.

Затем я добавил Supplier<Flux<String>> в тот же проект с соответствующее spring.cloud.stream.bindings и обновленное spring.cloud.stream.function.definition значение до fun;sup. И тут начинают происходить странные вещи. При попытке запустить приложение я получаю следующую ошибку:

2020-01-15 01:45:16.608 ERROR 10128 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.sup-out-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[20], headers={contentType=application/json, id=89301e00-b285-56e0-cb4d-8133555c8905, timestamp=1579045516603}], failedMessage=GenericMessage [payload=byte[20], headers={contentType=application/json, id=89301e00-b285-56e0-cb4d-8133555c8905, timestamp=1579045516603}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:206)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:188)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:219)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:57)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$DelegatingSubscriber.hookOnNext(ReactiveStreamsConsumer.java:165)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$DelegatingSubscriber.hookOnNext(ReactiveStreamsConsumer.java:148)
    at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:426)
    at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:268)
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
    at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
    at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
    at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$2(FluxMessageChannel.java:83)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:189)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:398)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:484)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[20], headers={contentType=application/json, id=89301e00-b285-56e0-cb4d-8133555c8905, timestamp=1579045516603}]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    ... 34 more

После этого я попробовал несколько вещей:

  1. Отменено spring.cloud.stream.function.definition до fun (отключить sup привязка бина к внешнему назначению). Приложение запущено, функция работает, поставщик не работает. Все как положено.
  2. Изменено spring.cloud.stream.function.definition на sup (отключить привязку компонента fun к внешнему назначению). Приложение запущено, функция не работает, поставщик работает (выдает сообщение my-sup-out topi c каждую секунду). Все как и ожидалось.
  3. Обновлено spring.cloud.stream.function.definition значение до fun;sup. Приложение не запустилось, получило то же исключение MessageDeliveryException.
  4. Поменял значение spring.cloud.stream.function.definition на sup;fun. Приложение запущено , поставщик работал, , но функция не работала (не отправлял сообщения на my-fun-out topi c).

последний сбил меня с толку даже больше, чем ошибка) Так что теперь мне нужна чья-то помощь, чтобы разобраться.

Я что-то упустил в конфигурации? Почему изменение порядка бобов, разделенное ; в spring.cloud.stream.function.definition, приводит к разным результатам?

Полный проект загружен в GitHub и добавлен ниже:

StreamApplication. java:

package com.kaine;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Flux;

import java.util.function.Function;
import java.util.function.Supplier;

@SpringBootApplication
public class StreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamApplication.class);
    }

    @Bean
    public Function<String, String> fun() {
        return value -> value.toUpperCase();
    }

    @Bean
    public Supplier<Flux<String>> sup() {
        return () -> Flux.from(emitter -> {
            while (true) {
                try {
                    emitter.onNext("Hello from Supplier!");
                    Thread.sleep(1000);
                } catch (Exception e) {
                    // ignore
                }
            }
        });
    }
}

application.yml

spring:
  cloud:
    stream:
      function:
        definition: fun;sup
      bindings:
        fun-in-0:
          destination: my-fun-in
        fun-out-0:
          destination: my-fun-out
        sup-out-0:
          destination: my-sup-out

build.gradle.kts:

plugins {
    java
}

group = "com.kaine"
version = "1.0-SNAPSHOT"

repositories {
    mavenCentral()
}

dependencies {
        implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR1"))
        implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")

        implementation(platform("org.springframework.boot:spring-boot-dependencies:2.2.2.RELEASE"))
}

configure<JavaPluginConvention> {
    sourceCompatibility = JavaVersion.VERSION_11
}

1 Ответ

1 голос
/ 15 января 2020

На самом деле это проблема с нашей документацией, так как я считаю, что мы приводим плохой пример реактивного Поставщика для его дела. Проблема в том, что ваш поставщик находится в бесконечной блокировке l oop. Это в основном никогда не возвращается. Поэтому, пожалуйста, измените его на что-то вроде:

@Bean
public Supplier<Flux<String>> sup() {
    return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {

        @Override
        public String get() {
            try {
                Thread.sleep(1000);
                return "Hello from Supplier";
            } catch (Exception e) {
                // ignore
            }
        }

    })).subscribeOn(Schedulers.elastic()).share();
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...