Flux из WebClient ведет себя иначе, чем Flux из File.readLines - PullRequest
0 голосов
/ 29 ноября 2018

У меня есть два разных источника некоторых идентификаторов, с которыми мне нужно работать.Один из файла, другой из URL.Когда я создаю Flux из строк Файлов, я могу отлично работать над этим.Когда я переключаю функцию создания потока на ту, которая использует WebClient .... get (), я получаю разные результаты;WebClient никогда не вызывается по какой-то причине.

private Flux<String> retrieveIdListFromFile(String filename) {
  try {
    return Flux.fromIterable(Files.readAllLines(ResourceUtils.getFile(filename).toPath()));
  } catch (IOException e) {
    return Flux.error(e);
  }
}

Здесь часть WebClient ...

private Flux<String> retrieveIdList() {
  return client.get()
      .uri(uriBuilder -> uriBuilder.path("capdocuments_201811v2/selectRaw")
          .queryParam("q", "-P_Id:[* TO *]")
          .queryParam("fq", "DateLastModified:[2010-01-01T00:00:00Z TO 2016-12-31T00:00:00Z]")
          .queryParam("fl", "id")
          .queryParam("rows", "10")
          .queryParam("wt", "csv")
          .build())
      .retrieve()
      .bodyToFlux(String.class);
}

Когда я делаю subscribe(System.out::println) в потоке WebClient, ничего не происходит.Когда я делаю blockLast (), он работает (URL вызывается, данные возвращаются).Я не понимаю, почему и как это исправить, и что я делаю неправильно.С потоком, который исходит из файла, даже подписка работает нормально.Я как бы подумала, что Fluxes взаимозаменяемы ...

Когда я делаю retrieveIdList().log().subscribe():

INFO [main] reactor.Flux.OnAssembly.1 | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
INFO [main] reactor.Flux.OnAssembly.1 | request(unbounded)

Когда я делаю то же самое с blockLast () вместо subscribe ():

INFO [main] reactor.Flux.OnAssembly.1 | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
INFO [main] reactor.Flux.OnAssembly.1 | request(unbounded)
INFO [reactor-http-nio-4] reactor.Flux.OnAssembly.1 | onNext(id)
.
.
.

1 Ответ

0 голосов
/ 30 ноября 2018

Судя по вашему обновлению вопроса, кажется, что ничего не ждет завершения обработки.Я предполагаю, что это пакетное или CLI-приложение, а не веб-приложение?

Предполагается следующее:

Flux<User> users = userService.fetchAll();

Вызов blockLast на Flux вызовет обработку и block до тех пор, пока не будет получен результат.

Вызов subscribe для него запустит обработку асинхронно;мы видим подписные элементы request в ваших журналах, но не более того.Это, вероятно, означает, что JVM завершает работу перед публикацией каких-либо элементов - ничего не ждет от результата.

Если вы эффективно пишете какое-либо CLI / пакетное приложение и не обрабатываете запросы в веб-приложении, вы можете block на конечном реактивном трубопроводе, чтобы получить результат.Если вы хотите записать этот результат в файл или отправить его в другой сервис, вам следует составить его с операторами реактора.

...