В первый раз, используя WebFlux, Flux, реактивный и т. Д. У меня есть простая настройка сервера для GET, POST, PUT, DELETE Message. Я хотел бы передать значение через поток событий, отправленных сервером, как только маршрутизатор сервера получит POST.
fun router() = router {
accept(TEXT_EVENT_STREAM).nest {
GET("/messages/events", messageHandler::showLastMessage)
}
"/".nest {
(accept(APPLICATION_JSON) and "/messages").nest {
GET("/", messageHandler::getMessages)
POST("/", messageHandler::addMessage)
GET("/{id}", messageHandler::getMessage)
PUT("/{id}", messageHandler::updateMessage)
DELETE("/{id}", messageHandler::deleteMessage)
}
}
resources("/**", ClassPathResource("static/"))
}.filter { request, next ->
next.handle(request).flatMap {
if (it is RenderingResponse) RenderingResponse.from(it).modelAttributes(attributes(request.locale(), messageSource)).build() else it.toMono()
}
}
В этом примере / messages / events просто повторяет всю коллекцию (потому что я не знаю, что я делаю)
val lastMessage = Flux.just(messageMap.values)
val lastMessageStream = Flux
.zip(Flux.interval(Duration.ofMillis(1000)), lastMessage.repeat())
fun showLastMessage(req: ServerRequest) =
ok().bodyToServerSentEvents(lastMessageStream)
Мои вопросы (я думаю):
Возможно ли излучение из Flux, аналогично interval (), но ТОЛЬКО при изменении коллекции, или что-то запускает испускание Flux? Другими словами:
Flux.wheneverIgetSomething(thatSomething)