Использование Flux.cache для повторного использования подписки на канал Redis в webflux - PullRequest
0 голосов
/ 09 октября 2018

Я использую Spring webflux для создания конечной точки, которая будет передавать события, полученные от подписки на канал Redis.

Это примерно так:

class MyService(redisTemplate: ReactiveRedisOperations<String, String>) {

    private val redisChannelFlux = redisTemplate
            .listenToChannel("myChannel")
            .map { it.message }
            .cache(0) // transforms this FLux into a reusable Hot publisher

    fun watch() : Flux<String> {
        return redisChannelFlux
    }

}

class MyController(val svc: MyService) {

    @GetMapping("/api/watch", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun watch() : Flux<String> {
        return svc.watch()
    }

}

Это работает.Когда клиент подписывается на конечную точку /api/watch, он начинает получать новые события от канала Redis, и я могу подтвердить на мониторе Redis, что "SUBSCRIBE" "myChannel" происходит только один раз, независимо от того, сколько клиентов подключено к моей реактивной конечной точке.Круто!

Я просто не уверен, насколько безопасно использовать Flux.cache() в этом сценарии.Я заигрываю с катастрофой здесь?Есть ли рекомендуемый способ повторного использования существующего издателя с новыми подписчиками?

1 Ответ

0 голосов
/ 09 октября 2018

Flux.cache() обычно используется для воспроизведения новым подписчикам последних N элементов этого Flux.Поскольку здесь вы устанавливаете это число на 0, кажется, вам просто интересно делиться ресурсами, а не воспроизводить последние события для нового подписчика.

Имея это в виду, вы можете просто использоватьFlux.share() вместо.Этот оператор подпишется на ваш экземпляр redis, как только придет первый подписчик, и поделится ресурсами со всеми остальными.Как только все подписчики исчезнут, соединение с вашим экземпляром redis будет закрыто, пока не придет другой подписчик и т. Д.

...