Реактивное программирование: Spring WebFlux: Как построить цепочку вызовов микро-услуг? - PullRequest
1 голос
/ 26 марта 2020

Spring Boot Приложение:

a @RestController получает следующую полезную нагрузку:

{
  "cartoon": "The Little Mermaid",
  "characterNames": ["Ariel", "Prince Eric", "Sebastian", "Flounder"]
}

Мне нужно обработать ее следующим образом:

  1. Получить уникальный идентификатор для каждого имени персонажа: сделать HTTP-вызов для микросервиса «карикатурные персонажи», который возвращает идентификаторы по именам
  2. Преобразовать данные, полученные контроллером: заменить имена персонажей соответствующими идентификаторами были получены на предыдущем шаге от микросервиса "герои мультфильмов". { "cartoon": "The Little Mermaid", "characterIds": [1, 2, 3, 4] }

  3. Отправьте HTTP-запрос POST в микросервис «cartoon-db» с преобразованными данными.

  4. Сопоставьте ответ от «cartoon-db» с внутреннее представление, которое является возвращаемым значением контроллера.

Проблема, которую я получил:

Мне нужно реализовать все эти шаги, используя парадигму Reactive Programming (неблокирующая \ asyn c processing) с Spring WebFlux (Mono | Flux) и Spring Reactive WebClient - но у меня нет опыта работы с этим стеком, я стараюсь читать об этом столько, сколько могу, плюс много гуглить, но все же , есть куча неотвеченных вопросов, например:

Q1 . Я уже настроил реактивный webClient, который отправляет запрос в микросервис «мультипликационные персонажи»:

      public Mono<Integer> getCartoonCharacterIdbyName(String characterName) {
    return WebClient.builder().baseUrl("http://cartoon-characters").build()
        .get()
        .uri("/character/{characterName}", characterName)
        .retrieve()
        .bodyToMono(Integer.class);
  }

Как видите, у меня есть список имен мультипликационных персонажей, и для каждого из них мне нужно позвонить getCartoonCharacterIdbyName(String name) метод, я не уверен, что правильный вариант для его последовательного вызова, считаю правильным вариант: параллельное выполнение.

Написал следующий метод:

  public List<Integer> getCartoonCharacterIds(List<String> names) {
Flux<Integer> flux = Flux.fromStream(names.stream())
    .flatMap(this::getCartoonCharacterIdbyName);

return StreamSupport.stream(flux.toIterable().spliterator(), false)
    .collect(Collectors.toList());

}

но у меня есть сомнения, что этот код выполняет параллельное WebClient выполнение, а также кодовые вызовы flux.toIterable(), которые блокируют поток, поэтому в этой реализации я потерял неблокирующий механизм.

предположения верны?

Как мне переписать его, чтобы он имел параллелизм и неблокирование?

Q2. Возможно ли технически преобразовать входные данные, полученные контроллером? (Я имею в виду заменить имена на идентификаторы) в реактивном стиле: когда мы работаем с Flux<Integer> символами, но не с List<Integer> символами?

Q3. Возможно ли это чтобы получить не просто преобразованный объект данных, но Mono <> после шаг 2 , который может быть использован другим веб-клиентом в шаг 3 ?

1 Ответ

1 голос
/ 29 марта 2020

На самом деле это хороший вопрос, так как для понимания WebFlux или структуры реактора проекта, когда речь идет о цепочке микросервисов, требуется пара шагов.

Первое - это понять, что WebClient должен взять издателя и вернуть издателя. Экстраполируйте это на 4 различных сигнатуры метода, чтобы помочь с мышлением.

  • Моно -> Моно
  • Флюс -> Флюс
  • Моно -> Флюс
  • Flux -> Mono

Конечно, во всех случаях это просто Publisher-> Publisher, но оставьте это, пока вы не поймете вещи лучше. Первые два очевидны, и вы просто используете .map(...) для обработки объектов в потоке, но вам нужно научиться обрабатывать вторые два. Как указано выше, переход от Flux-> Mono может быть выполнен с помощью .collectList() или также с .reduce(...). Переход от Mono-> Flux, по-видимому, обычно выполняется с .flatMapMany или .flatMapIterable или некоторым другим вариантом. Есть, вероятно, другие методы. Вы никогда не должны использовать .block() в любом коде WebFlux, и, как правило, вы получите ошибку времени выполнения, если попытаетесь это сделать.

В вашем примере вы хотите от go до

  • (моно-> флюс) -> (флюс-> флюс) -> (флюс-> флюс)

Как вы сказали, вы хотите

  • Mono-> Flux-> Flux

Вторая часть заключается в понимании цепочки потоков. Вы могли бы сделать

  • p3 (p2 (p1 (объект)));

, который бы связал p1-> p2-> p3, но я всегда находил это более понятным вместо этого создать «Сервисный слой».

  • o2 = p1 (объект);
  • o3 = p2 (o2);
  • result = p3 (o3) ;

Этот код намного легче читать и поддерживать, и с некоторой зрелостью вы понимаете ценность этого утверждения.

Единственная проблема, с которой я столкнулся на вашем примере делал Flux<String> с WebClient как @RequestBody. Не работает См. WebClient bodyToFlux (String.class), чтобы список строк не разделял отдельные значения . Помимо этого, это довольно простое приложение. При отладке вы обнаружите, что он достигает строки .subscribe(System.out::println), прежде чем он попадет в строку Flux<Integer> ids = mapNamesToIds(fn). Это потому, что поток настроен до его выполнения. Требуется время, чтобы понять это, но в этом суть проекта реактора.

@SpringBootApplication
@RestController
@RequestMapping("/demo")
public class DemoApplication implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    Map<Integer, CartoonCharacter> characters;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        String[] names = new String[] {"Ariel", "Prince Eric", "Sebastian", "Flounder"};
        characters = Arrays.asList( new CartoonCharacter[] {
                new CartoonCharacter(names[0].hashCode(), names[0], "Mermaid"), 
                new CartoonCharacter(names[1].hashCode(), names[1], "Human"), 
                new CartoonCharacter(names[2].hashCode(), names[2], "Crustacean"), 
                new CartoonCharacter(names[3].hashCode(), names[3], "Fish")} 
        )
        .stream().collect(Collectors.toMap(CartoonCharacter::getId, Function.identity()));
        // TODO Auto-generated method stub
        CartoonRequest cr = CartoonRequest.builder()
        .cartoon("The Little Mermaid")
        .characterNames(Arrays.asList(names))
        .build();
        thisLocalClient
            .post()
            .uri("cartoonDetails")
            .body(Mono.just(cr), CartoonRequest.class)
            .retrieve()
            .bodyToFlux(CartoonCharacter.class)
            .subscribe(System.out::println);
    }

    @Bean
    WebClient localClient() {
        return WebClient.create("http://localhost:8080/demo/");
    }

    @Autowired
    WebClient thisLocalClient;

    @PostMapping("cartoonDetails")
    Flux<CartoonCharacter> getDetails(@RequestBody Mono<CartoonRequest> cartoonRequest) {
        Flux<StringWrapper> fn = cartoonRequest.flatMapIterable(cr->cr.getCharacterNames().stream().map(StringWrapper::new).collect(Collectors.toList()));
        Flux<Integer> ids = mapNamesToIds(fn);
        Flux<CartoonCharacter> details = mapIdsToDetails(ids);
        return details;
    }
    //  Service Layer Methods
    private Flux<Integer> mapNamesToIds(Flux<StringWrapper> names) {
        return thisLocalClient
            .post()
            .uri("findIds")
            .body(names, StringWrapper.class)
            .retrieve()
            .bodyToFlux(Integer.class);
    }
    private Flux<CartoonCharacter> mapIdsToDetails(Flux<Integer> ids) {
        return thisLocalClient
            .post()
            .uri("findDetails")
            .body(ids, Integer.class)
            .retrieve()
            .bodyToFlux(CartoonCharacter.class);
    }
    // Services
    @PostMapping("findIds")
    Flux<Integer> getIds(@RequestBody Flux<StringWrapper> names) {
        return names.map(name->name.getString().hashCode());
    }
    @PostMapping("findDetails")
    Flux<CartoonCharacter> getDetails(@RequestBody Flux<Integer> ids) {
        return ids.map(characters::get);
    }
}

Также:

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class StringWrapper {
    private String string;
}
@Data
@Builder
public class CartoonRequest {
    private String cartoon;
    private List<String> characterNames;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CartoonCharacter {
    Integer id;
    String name;
    String species;
}
...