Контролировать задержку выполнения повторного задания для Mono - PullRequest
0 голосов
/ 15 апреля 2020

Я пытаюсь реализовать механизм опроса. Я хотел бы увеличить или уменьшить интервал опроса на основе некоторых условий. Я использую Mono.repeat с delayElements для выполнения задачи повтора с интервалом. Но я не могу найти способ изменить задержку на основе некоторых критериев .

Mono.just(1).
    repeat().
    delayElements(getPollingInterval()).
    takeUntil((s)->
      {

          if(checkForEndCriteria()){
              log.info("Critera to end reached);
              return true;
          }
          return false;
      }).
    log().
    subscribeOn(Schedulers.boundedElastic()).
    flatMapSequential(x -> {
        List<Event> eventList = getEvents(id, lastItemTimeStamp);;
        if (!eventList.isEmpty()) {
            //Recieving events now. So want to decrease the interval.
            return Flux.fromIterable(eventList);
        } else {
        //There are no events happening .So I would like
        //to increase the delay of repeat task by 1 sec

            return Flux.just(buildHeartBeatEvent());
        }
    }).
    onErrorResume(error -> {
        log.error("Error occurred", error);
        return Flux.error(error);
    });```

1 Ответ

0 голосов
/ 15 апреля 2020

Я реализовал это с помощью Flux.<Duration>generate():

        Flux
            .<Duration>generate(sink -> {
                Date date = [NEXT_DATE];
                if (date != null) {
                    long millis = date.getTime() - System.currentTimeMillis();
                    sink.next(Duration.ofMillis(millis));
                }
                else {
                    sink.complete();
                }
            })
            .concatMap(duration ->
                    Mono.delay(duration)
                    ...
            )
            .repeat();

Итак, каждый раз, когда мы возвращаемся к generate() с этим repeat(), мы можем заглянуть в какое-то состояние, чтобы получить следующее исполнение Date.

...