Реактивный слушатель Spring Cloud Stream без вывода - PullRequest
0 голосов
/ 04 января 2019

Я использую Reactive Spring Cloud Stream, и у меня возникают проблемы при создании StreamListener без вывода.Следующий код работает, пока не получено искаженных сообщений.Когда получено искаженное сообщение, поток закрывается.

@StreamListener
public void handleMessage(@Input(MessagingConfig.INPUT) Flux<String> payloads) {
    payloads.flatMap(objectToSave -> reactiveMongoTemplate.insert(objectToSave)).subscribe();
}

Если я правильно понимаю, предпочтительно, чтобы платформа подписывалась на поток, а не подписывалась на него вручную.Это не проблема, когда у слушателя есть выход, потому что я могу просто вернуть поток как:это возвращается.Есть ли способ позволить фреймворку обрабатывать поток, когда слушатель не указывает вывод?

Ответы [ 2 ]

0 голосов
/ 18 января 2019

Если вы действительно хотите избежать SCF, упомянутой в ответе Олега, попробуйте ниже, хакерский подход.

const val IN = "input"
const val OUT = "dummy-output"

interface Channels {
    @Input(IN)
    fun input(): MessageChannel

    @Output(OUT)
    fun output(): MessageChannel
}

@EnableBinding(Channels::class)
class MsgList {
    @StreamListener
    @Output(OUT)
    fun receive(@Input(IN) messages: Flux<String>): Flux<Void> {
        return messages
            .doOnNext { if (it == "err") throw IllegalStateException("err") }
            .doOnNext { println(it) }
            .flatMap { Mono.empty<Void>() }
    }
}

Будет создана выходная привязка, но сообщения не будут проходить.В случае RabbitMQ это означает, что появится фиктивный обмен, но очередь не будет создана.

Также ошибки будут обрабатываться, как вы ожидали.В приведенном выше примере вы можете отправить 3 сообщения: «ok», «err», «ok2», и вы увидите «ok», затем исключение, а затем «ok2» на экране.«Ok2» и любое последующее действительное сообщение будет обработано должным образом.

0 голосов
/ 04 января 2019

Рассмотрим переход на использование модели программирования Spring Cloud Function (SCF), которую мы недавно приняли .В принципе, если у вас самая последняя кодовая база (2.1.0.RC4 - самая последняя, ​​а RELEASE - через несколько дней), то все в порядке.Вот пример вашего кода с использованием модели программирования SCF:

@SpringBootApplication
@EnableBinding(Sink.class)
public class SampleReactiveConsumer {

    public static void main(String[] args) {
        SpringApplication.run(SampleReactiveConsumer.class, 
                   "--spring.cloud.stream.function.definition=consume");
    }

    @Bean
    public Consumer<Flux<String>> consume(){
        return payloads -> payloads.flatMap(objectToSave -> reactiveMongoTemplate.insert(objectToSave)).subscribe();
    }
}

Вы также можете удалить реактивный модуль из вашего пути к классам, так как мы также рассматриваем его все вместе как устаревший

...