Используйте Reactor Core Java для периодической отправки в браузер статуса давно выполняемой задачи - PullRequest
0 голосов
/ 10 мая 2018

Я работал с некоторым Reactor Core Java, потому что я хочу выяснить, возможно ли это решить одну проблему, которая у меня есть на данный момент при использовании этой инфраструктуры.

В настоящее время у меня долгая, выполняемая работа, которая занимает около 40-50 минут. Метод выглядит примерно так:

  public void doLongTask(List<Something> list){
    //instructions.
    for(Something sm : list){
      if(condition){
         executeLongOperation();
      }
      //instructions
      if(condition){
       executeLongOperation();
      }
    }
  }

в моем контроллере у меня есть что-то вроде этого:

@GetMapping(path = "/integersReactor", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Integer> getIntegersReactor(){
    logger.debug("Request getIntegersReactor initialized.");
    return simpleSearchService.getIntegersReactor();
}

и в слое обслуживания у меня есть что-то вроде этого:

@Override
public Flux<Integer> getIntegersReactor(){
    return Flux.range(0, Integer.MAX_VALUE);
}

это просто заполнитель, который я использую в качестве доказательства концепции. Мои настоящие намерения состоят в том, чтобы как-то вернуть поток некоторого объекта, который я сам определю, у этого объекта будет несколько полей, которые я буду использовать, чтобы сообщить потребителю статус задания.

Теперь все становится несколько сложнее, потому что я хотел бы отправлять обновления как executeLongOperation (); выполняются, и каким-то образом вместо возврата потока целых чисел возвращают поток объекта, который использует возврат executeLongOperation ();

Может ли это быть достигнуто с Flux? Как я могу использовать Java Reactor Core, чтобы выдавать возвращаемые значения за все время executeLongOperation (); выполняется в реактивный поток, который может быть передан в контроллер так же, как это делает метод getIntegersReactor () в моем примере?

1 Ответ

0 голосов
/ 11 мая 2018

Да, это должно быть возможно, но поскольку executeLongOperation блокирует, его необходимо будет сместить в выделенном потоке (что уменьшает преимущества, которые вы получаете от реактивной реализации сверху вниз).

Измените doLongTask, чтобы он возвращал Flux<Foo>, сделайте его объединенным Mono s, которые обертывают executeLongOperation в выделенном потоке (или, что еще лучше, измените сам executeLongOperation, чтобы он возвращал Mono<Foo>, и выполните внутренняя обертка и внутренняя subscribeOn другая нить). Что-то вроде:

public Flux<Foo> doLongTask(List<Something> list) {
    return Flux.fromIterable(list)
               //ensure `Something` are published on a dedicated thread on which
               //we can block
               .publishOn(Schedulers.elastic()) //maybe a dedicated Scheduler?
               //for each `Something`, perform the work
               .flatMap(sm -> {
                 //in case condition is false, we'll avoid long running task
                 Flux<Foo> work = Flux.empty();
                 //start declaring the work depending on conditions
                 if(condition) {
                     Mono<Foo> op = Mono.fromCallable(this::executeLongOperation);
                     work = conditional.concatWith(op);
                 }
                 //all other instructions should preferably be non-blocking
                 //but since we're on a dedicated thread at this point, it should be ok
                 if(condition) {
                     Mono<Foo> op = Mono.fromCallable(this::executeLongOperation);
                     work = conditional.concatWith(op);
                 }
                 //let the flatMap trigger the work
                 return work;
               });
}
...