Как повторить или повторить с динамическими значениями c, используя Project Reactor? - PullRequest
0 голосов
/ 24 апреля 2020

У меня есть метод, который выполняет вызов службы и возвращает значение как Mono. Пока я не получу желаемое значение, мне придется повторить выполнение этого метода с некоторым откатом.

Я пытаюсь выполнить это требование, используя функциональность repeat() Project Reactor.

Упрощенный пример моего варианта использования будет выглядеть примерно так:

Mono<Boolean> evaluateValue = fetchValueFromService(id)
          .filter(value -> {
            if (value != requiredValue) {
              id++;
              return false;
            }
            return true;
          })
          .repeatWhenEmpty(Repeat
              .onlyIf(repeatContext -> true)
              .fixedBackoff(Duration.ofSeconds(5)));
evaluateValue.subscribe();

Проблема с этим подходом состоит в том, что мы не можем обновить значение id, поскольку

Используемая переменная в лямбда-выражении должно быть окончательным или фактически окончательным

Мне удалось преодолеть это с помощью AtomicReference.

AtomicReference<Integer> id = new AtomicReference(0);
Mono<Boolean> evaluateValue = fetchValueFromService(id.get())
          .filter(value -> {
            if (value != requiredValue) {
              System.out.println(id.incrementAndGet());
              return false;
            }
            return true;
          })
          .repeatWhenEmpty(Repeat
              .onlyIf(repeatContext -> true)
              .fixedBackoff(Duration.ofSeconds(5)));
evaluateValue.subscribe();

Однако это не решает проблему, так как только начальное значение передается каждый раз. Интересно, это потому, что Mono неизменен и не может быть изменен?

Есть ли другой способ решить эту проблему? Можно ли будет динамически обновлять параметр / аргумент, используемый методом (который повторяется)?

Спасибо.

1 Ответ

0 голосов
/ 25 апреля 2020

Взгляните на create / generate методов здесь для динамического создания потока / моно.

Это может решить вашу проблему.

  Mono.create(sink ->
        sink.success(fetchValueFromService(id.get()))
    )

Например:

    AtomicInteger atomicInteger = new AtomicInteger(0);
    Mono.create(sink -> sink.success(atomicInteger.getAndIncrement()))
            .repeat(10)
            .subscribe(System.out::println);

Печатает до 10 из 0.

...