Я занимаюсь темой реактивного программирования.
Я использую WebFlux и R2dbc для доступа к базе данных.
Я разрабатываю небольшой бэкэнд для приложения чата.
Поэтому у меня есть несколько вопросов по этому вопросу, и я был бы очень рад предложениям и улучшениям.
Это класс обслуживания
@Service
public class MessageService implements IMessageService {
UnicastProcessor<Message> hotProcessor = UnicastProcessor.create();
private final FluxSink<Message> fluxSink = hotProcessor.sink(FluxSink.OverflowStrategy.LATEST);
private final Flux<Message> hotFlux = hotProcessor.publish().autoConnect();
@Autowired
private MessageRepository messageRepository;
public Flux<Message> findAll() {
return this.messageRepository.findAll();
}
public Mono<Message> create(Message nachricht) {
this.fluxSink.next(nachricht);
return this.messageRepository.save(nachricht);
}
// how can I persist messages and to arrange a chat room.
public Flux<Message> finAllMessagesByChatroomId(Long id) {
return hotFlux.filter(m->m.getId().intValue() == id.intValue());
}
}
Когда я должен использовать UnicastProcessor, когда DirectProcessor.
Обновление:
небольшая коррекция
public Flux<Message> finAllMessagesByChatroomId(Long id) {
return hotFlux.filter(m->m.getRoom_id == id);
}
Но как мне получить все Сообщения из базы данных и добавить их в fluxSink?