Я недавно начал использовать WebFlux и мне нужно предложение о том, как объединить несколько сервисов и объединить ответы на пути.4 службы и их POJO-ответ похожи на следующий пример:
class Response1{
String a1;
String a2;
}
class Response2{
String b1;
}
class Response3{
String c1;
}
class Response4{
String d1;
}
и сигнатура 4-х служб:
Flux<Response1> service1();
Flux<Response2> service2(String a1); //output field of Response1(service 1)
Flux<Response3> service3(String b1); //output field of Response2(service 2)
Mono<Response4> service4(String a2); //output field of Response1(service 1)
Так что service2 необходимо вызывать для каждого Response1 в Flux, иservice3 для каждого ответа2.Отношения между моделями:
Response1 <1-----*>Response2 (1 to many),
Response2 <1-----*>Response3 (1 to many),
Response1 <1-----1>Response4 (1 to 1)
Совокупный окончательный ответ должен выглядеть следующим образом (JSON):
[
{
"a1": "",
"a2": "",
"d1": "",
"response2s": [
{
"b1": "",
"response3s": [
{
"c1": ""
}
]
}
]
}
]
Итак, сначала мне нужно вызвать Service1, а затем вызвать service2 для каждого Response1, затем вызватьservice3 для каждого Response2 (возвращается service2).Кроме того, вызов service4 для каждого ответа1, возвращаемого service1 (может вызываться параллельно вызовам service2 и service3).Чтобы обновить Агрегированный окончательный ответ, я добавил два дополнительных POJO для хранения дочерних ответов, например (соответствующие биты):
public class AggResponse extends Response1{
List<AggResponse2> response2s;// populated from service2 response
String d1; // populated from service4 response
public void add(AggResponse2 res2){
if(response2s == null)
response2s = new ArrayList<>();
response2s.add(res2);
}
}
и
public class AggResponse2 extends Response2{
List<Response3> response3s;// populated from service3 response
public void add(Response3 res3) {
if (response3s == null)
response3s = new ArrayList<>();
response3s.add(res3);
}
}
Как лучше всего сделать цепочкучтобы я сохранил предыдущие данные ответа и при объединении операторов сохранил все данные в объекте AggResponse?Я попытался сделать следующее:
public Flux<AggResponse> aggregate() {
return services.service1()
.map(res1 -> new AggResponse(res1.getA1(), res1.getA2()))
.flatMap(aggRes -> services.service2(aggRes.getA1())
.map(res2 -> {
AggResponse2 aggRes2 = new AggResponse2(res2.getB1());
aggRes.add(aggRes2);
return aggRes2;
})
.flatMap(aggRes2 -> services.service3(aggRes2.getB1())
.map(res3 -> {
aggRes2.add(res3);
return res3;
})
.reduce(aggRes2, (a, aggRes3) -> aggRes2)
)
.reduce(aggRes, (a, aggRes2) -> aggRes)
)
.flatMap(aggRes -> services.service4(aggRes.getA1())
.map(res4 -> {
aggRes.setD1(res4.getD1());
return aggRes;
})
);
}
, однако я получаю следующий неполный ответ:
[ {
"a1" : "a1v1",
"a2" : "a2v1"
} ]
Я вижу, что все службы вызываются при просмотре журналов.Два вопроса: 1. почему не видно агрегированного ответа, можно ли его уменьшить, потеряв его?2. есть ли лучший подход для достижения этого?