На самом деле это хороший вопрос, так как для понимания WebFlux или структуры реактора проекта, когда речь идет о цепочке микросервисов, требуется пара шагов.
Первое - это понять, что WebClient
должен взять издателя и вернуть издателя. Экстраполируйте это на 4 различных сигнатуры метода, чтобы помочь с мышлением.
- Моно -> Моно
- Флюс -> Флюс
- Моно -> Флюс
- Flux -> Mono
Конечно, во всех случаях это просто Publisher-> Publisher, но оставьте это, пока вы не поймете вещи лучше. Первые два очевидны, и вы просто используете .map(...)
для обработки объектов в потоке, но вам нужно научиться обрабатывать вторые два. Как указано выше, переход от Flux-> Mono может быть выполнен с помощью .collectList()
или также с .reduce(...)
. Переход от Mono-> Flux, по-видимому, обычно выполняется с .flatMapMany
или .flatMapIterable
или некоторым другим вариантом. Есть, вероятно, другие методы. Вы никогда не должны использовать .block()
в любом коде WebFlux, и, как правило, вы получите ошибку времени выполнения, если попытаетесь это сделать.
В вашем примере вы хотите от go до
- (моно-> флюс) -> (флюс-> флюс) -> (флюс-> флюс)
Как вы сказали, вы хотите
Вторая часть заключается в понимании цепочки потоков. Вы могли бы сделать
, который бы связал p1-> p2-> p3, но я всегда находил это более понятным вместо этого создать «Сервисный слой».
- o2 = p1 (объект);
- o3 = p2 (o2);
- result = p3 (o3) ;
Этот код намного легче читать и поддерживать, и с некоторой зрелостью вы понимаете ценность этого утверждения.
Единственная проблема, с которой я столкнулся на вашем примере делал Flux<String>
с WebClient
как @RequestBody
. Не работает См. WebClient bodyToFlux (String.class), чтобы список строк не разделял отдельные значения . Помимо этого, это довольно простое приложение. При отладке вы обнаружите, что он достигает строки .subscribe(System.out::println)
, прежде чем он попадет в строку Flux<Integer> ids = mapNamesToIds(fn)
. Это потому, что поток настроен до его выполнения. Требуется время, чтобы понять это, но в этом суть проекта реактора.
@SpringBootApplication
@RestController
@RequestMapping("/demo")
public class DemoApplication implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
Map<Integer, CartoonCharacter> characters;
@Override
public void run(ApplicationArguments args) throws Exception {
String[] names = new String[] {"Ariel", "Prince Eric", "Sebastian", "Flounder"};
characters = Arrays.asList( new CartoonCharacter[] {
new CartoonCharacter(names[0].hashCode(), names[0], "Mermaid"),
new CartoonCharacter(names[1].hashCode(), names[1], "Human"),
new CartoonCharacter(names[2].hashCode(), names[2], "Crustacean"),
new CartoonCharacter(names[3].hashCode(), names[3], "Fish")}
)
.stream().collect(Collectors.toMap(CartoonCharacter::getId, Function.identity()));
// TODO Auto-generated method stub
CartoonRequest cr = CartoonRequest.builder()
.cartoon("The Little Mermaid")
.characterNames(Arrays.asList(names))
.build();
thisLocalClient
.post()
.uri("cartoonDetails")
.body(Mono.just(cr), CartoonRequest.class)
.retrieve()
.bodyToFlux(CartoonCharacter.class)
.subscribe(System.out::println);
}
@Bean
WebClient localClient() {
return WebClient.create("http://localhost:8080/demo/");
}
@Autowired
WebClient thisLocalClient;
@PostMapping("cartoonDetails")
Flux<CartoonCharacter> getDetails(@RequestBody Mono<CartoonRequest> cartoonRequest) {
Flux<StringWrapper> fn = cartoonRequest.flatMapIterable(cr->cr.getCharacterNames().stream().map(StringWrapper::new).collect(Collectors.toList()));
Flux<Integer> ids = mapNamesToIds(fn);
Flux<CartoonCharacter> details = mapIdsToDetails(ids);
return details;
}
// Service Layer Methods
private Flux<Integer> mapNamesToIds(Flux<StringWrapper> names) {
return thisLocalClient
.post()
.uri("findIds")
.body(names, StringWrapper.class)
.retrieve()
.bodyToFlux(Integer.class);
}
private Flux<CartoonCharacter> mapIdsToDetails(Flux<Integer> ids) {
return thisLocalClient
.post()
.uri("findDetails")
.body(ids, Integer.class)
.retrieve()
.bodyToFlux(CartoonCharacter.class);
}
// Services
@PostMapping("findIds")
Flux<Integer> getIds(@RequestBody Flux<StringWrapper> names) {
return names.map(name->name.getString().hashCode());
}
@PostMapping("findDetails")
Flux<CartoonCharacter> getDetails(@RequestBody Flux<Integer> ids) {
return ids.map(characters::get);
}
}
Также:
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class StringWrapper {
private String string;
}
@Data
@Builder
public class CartoonRequest {
private String cartoon;
private List<String> characterNames;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CartoonCharacter {
Integer id;
String name;
String species;
}