Переменный интервал через Flux.create / Flux.switchMap - PullRequest
1 голос
/ 21 марта 2019

Мне нужен таймер, который зависит от пользовательского входа. Вот самый маленький пример:

Flux.<Integer> create(e -> {
   log.info("create"); // Never gets triggered
   e.next(2); // Sample user input: change to 2 second interval
})
   .switchMap(v -> Flux.interval(Duration.ofSeconds(v)))
   .startWith(Flux.interval(Duration.ofSeconds(1)))
   .subscribe(e -> log.info("subscribe: {}", e)); // This works

В вышеприведенном:

  • Я создаю Flux<Integer>, который должен испускать вещи на основе пользовательского ввода (в приведенном выше примере он просто излучает 2),
  • Затем, исходя из этого, новый интервал переключается на switchMap
  • Начиная с 1-секундного интервала по умолчанию

Вышеприведенное работает ниже части switchMap, то есть я вижу, что она регистрирует «подписаться: N» каждую секунду, но «создать» не регистрируется, и e.next(2) никогда не вызывается.

Почему это не работает? Есть ли лучшее решение для этого варианта использования?

1 Ответ

1 голос
/ 21 марта 2019

Как описано в JavaDoc, Flux#startWith будет предшествовать данной последовательности.

Поскольку вы передаете Flux.interval(Duration.ofSeconds(1)) в качестве аргумента, он будет бесконечно испускать long каждую секунду, и ваш основанный на Flux.create издатель никогда не будет подписан.

Однако, это работает, если вы измените его на:

.startWith(Mono.delay(Duration.ofSeconds(1)))

Вы также можете изменить код на:

Flux.<Integer> create(e -> {
   log.info("create");
   e.next(2);
})
   .startWith(1)
   .switchMap(v -> Flux.interval(Duration.ofSeconds(v)))
   .subscribe(e -> log.info("subscribe: {}", e));

Здесь мы используем startWith сразу после блока Flux.create и позволяем switchMap обрабатывать его как любой другой сигнал.

Также помните, что switchMap(v -> Flux.interval(Duration.ofSeconds(v))) читается как:
«Начать излучать каждые N секунд, где N - самое последнее излучаемое значение»

Если вам нужно «отложить» это только один раз, рассмотрите возможность использования Mono.delay и здесь.

...