Я использую Spring webflux для создания конечной точки, которая будет передавать события, полученные от подписки на канал Redis.
Это примерно так:
class MyService(redisTemplate: ReactiveRedisOperations<String, String>) {
private val redisChannelFlux = redisTemplate
.listenToChannel("myChannel")
.map { it.message }
.cache(0) // transforms this FLux into a reusable Hot publisher
fun watch() : Flux<String> {
return redisChannelFlux
}
}
class MyController(val svc: MyService) {
@GetMapping("/api/watch", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun watch() : Flux<String> {
return svc.watch()
}
}
Это работает.Когда клиент подписывается на конечную точку /api/watch
, он начинает получать новые события от канала Redis, и я могу подтвердить на мониторе Redis, что "SUBSCRIBE" "myChannel"
происходит только один раз, независимо от того, сколько клиентов подключено к моей реактивной конечной точке.Круто!
Я просто не уверен, насколько безопасно использовать Flux.cache()
в этом сценарии.Я заигрываю с катастрофой здесь?Есть ли рекомендуемый способ повторного использования существующего издателя с новыми подписчиками?