Spring реактивный - событие при получении нового сообщения - PullRequest
5 голосов
/ 28 марта 2019

У меня есть приложение чата с весенним реактивным webflux. Когда создается новое сообщение, все подписчики получают это сообщение (это просто пример для упрощения моей проблемы). Все правильно и отлично сработало. Но теперь мне нужно событие для подписчиков, когда пришло новое сообщение.

Это мой код:

Контроллер:

@Autowired
@Qualifier("ptpReplayProcessor")
private ReplayProcessor<String> ptpReplayProcessor;

@GetMapping(value = "/chat/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> subscribe() {
    return Flux.from(ptpReplayProcessor);
}

ReplayProcessorConfig:

@Configuration
public class ReplayProcessorConfig {
    @Bean("ptpReplayProcessor")
    ReplayProcessor<String> ptpReplayProcessor() {
        ReplayProcessor<String> replayProcessor = ReplayProcessor.create(0, false);
        replayProcessor.subscribe(new BaseSubscriber<String>() {
            @Override
            protected void hookOnNext(String ptp) {
                System.out.println("replay processor called!");
            }

        });
        return replayProcessor;
    }
}

pom.xml

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
</dependency>

Когда создается новое сообщение, я звоню ptpReplayProcessor.onNext(message). Это работает правильно, и все клиенты получают сообщение, но фраза replay processor called! просто печатается от отправителя сообщения. Я хочу поднять событие для всех клиентов при получении нового сообщения. Также я попробовал ReplayProcessor.doOnNext() и ReplayProcessor.doOnEach() методы, но не сработало. Есть ли способ сделать это?

1 Ответ

2 голосов
/ 01 апреля 2019

вы можете сделать это по методу подписки:

return Flux.from(ptpReplayProcessor).doOnNext(s -> System.out.println("new message has been sent"));
...