У меня есть приложение чата с весенним реактивным webflux. Когда создается новое сообщение, все подписчики получают это сообщение (это просто пример для упрощения моей проблемы). Все правильно и отлично сработало. Но теперь мне нужно событие для подписчиков, когда пришло новое сообщение.
Это мой код:
Контроллер:
@Autowired
@Qualifier("ptpReplayProcessor")
private ReplayProcessor<String> ptpReplayProcessor;
@GetMapping(value = "/chat/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> subscribe() {
return Flux.from(ptpReplayProcessor);
}
ReplayProcessorConfig:
@Configuration
public class ReplayProcessorConfig {
@Bean("ptpReplayProcessor")
ReplayProcessor<String> ptpReplayProcessor() {
ReplayProcessor<String> replayProcessor = ReplayProcessor.create(0, false);
replayProcessor.subscribe(new BaseSubscriber<String>() {
@Override
protected void hookOnNext(String ptp) {
System.out.println("replay processor called!");
}
});
return replayProcessor;
}
}
pom.xml
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
Когда создается новое сообщение, я звоню ptpReplayProcessor.onNext(message)
. Это работает правильно, и все клиенты получают сообщение, но фраза replay processor called!
просто печатается от отправителя сообщения. Я хочу поднять событие для всех клиентов при получении нового сообщения. Также я попробовал ReplayProcessor.doOnNext()
и ReplayProcessor.doOnEach()
методы, но не сработало. Есть ли способ сделать это?