Reactive Redis не публикует постоянно изменения в Flux - PullRequest
4 голосов
/ 17 октября 2019

Я безуспешно пытаюсь получить живые обновления в моем списке заказов Redis. Кажется, что он выбирает все элементы и просто заканчивается на последнем элементе. Я бы хотел, чтобы клиент получал обновления по новому заказу в моем списке заказов. Чего мне не хватает?

Это мой код:

@RestController
class LiveOrderController {

    @Autowired
    lateinit var redisOperations: ReactiveRedisOperations<String, LiveOrder>

    @GetMapping(produces = [MediaType.TEXT_EVENT_STREAM_VALUE], value = "/orders")
    fun getLiveOrders(): Flux<LiveOrder> {
        val zops = redisOperations?.opsForZSet()
        return zops?.rangeByScore("orders", Range.unbounded())
    }
}

1 Ответ

2 голосов
/ 27 октября 2019

В Redis такой функции нет. Во-первых, реактивный поиск отсортированного набора - это просто моментальный снимок, но ваши вызовы происходят реактивным образом. Поэтому вам нужна подписка.

Если вы выбрали keyspace notifications, как это ( K - включить уведомления пространства клавиш, z - включить команды zset):

config set notify-keyspace-events Kz

И подпишитесь на них в вашем сервисе так:

  ReactiveRedisMessageListenerContainer reactiveRedisMessages;
  // ...
  reactiveRedisMessages.receive(new PatternTopic("__keyspace@0__:orders"))
      .map(m -> {
        System.out.println(m);
        return m;
      })
      <further processing>

Вы бы увидели сообщения, подобные этому: PatternMessage{channel=__keyspace@0__:orders, pattern=__keyspace@0__:orders, message=zadd}. Он сообщит вам, что что-то было добавлено. И вы можете как-то на это отреагировать - получить полный набор снова или только какую-то часть (голова / хвост). Вы могли бы даже вспомнить предыдущий набор, получить новый и отправить разность.

Но я действительно рекомендую перестроить поток так, чтобы напрямую использовать функциональность Redis Pub / Sub. Например: служба издателя вместо прямого вызова zadd вызовет eval, который выдаст 2 команды: zadd orders 1 x и publish orders "1:x" (любое пользовательское сообщение, которое вы хотите, возможно, JSON).

Затем в своем коде вы подпишетесь на свою тему, подобную этой:

return reactiveRedisMessages.receive(new PatternTopic("orders"))
      .map(LiveOrder::fromNotification);
...