Как изменить поток выполнения в Reactor - PullRequest
0 голосов
/ 02 мая 2018
@RequestMapping(value    = "/try",
      method   = RequestMethod.GET)
      @ResponseBody
public String demo(){
List<String>data=new ArrayList<>();
data.add("A1");
data.add("A2");
data.add("A3");
data.add("A4");

Flux.fromIterable(data).subscribe(s->printStatement(s));
return "done";
}

public  void printStatement(String s){
long i;
for(i=0;i<1000000000;i++)
{}
LOGGER.info(s+"------"+Thread.currentThread().getId());
}

Здесь, в приведенном выше примере, я надеялся, что идентификатор протектора будет другим (скачкообразное выполнение). Из журнала я мог видеть, что один и тот же протектор выполняет весь процесс

Вход:

2018-05-02 03:24:42.387  INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo       : A1------26
2018-05-02 03:24:44.118  INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo       : A2------26
2018-05-02 03:24:44.418  INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo       : A3------26
2018-05-02 03:24:44.717  INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo       : A4------26

Как мне убедиться, что он выполняется асинхронно.

1 Ответ

0 голосов
/ 02 мая 2018

Модель исполнения Reactor состоит в том, что большинство операторов не изменяют поток для вас (кроме случаев, когда требуется время). Библиотека предлагает два оператора, которые позволяют переключаться на потоки, publishOn (самый распространенный) и subscribeOn.

Например, Flux.fromIterable(data).publishOn(Schedulers.newSingle("example")).subscribe(...) будет подходить сюда.

Обратите внимание, что модель WebFlux заключается в том, что она запускает обработку цепочки в потоках Netty, в этих nio потоках, которые вы видите. Таким образом, очень важно, чтобы вы не блокировали эти потоки (это могло бы полностью предотвратить обработку дальнейших входящих запросов).

Schedulers предлагает фабричные методы для различных разновидностей Scheduler, что является абстракцией Reactor (более или менее поверх ExecutorService).

...