Цепочка WebFlux для вызова нескольких сервисов и агрегации ответов - PullRequest
0 голосов
/ 06 февраля 2019

Я недавно начал использовать WebFlux и мне нужно предложение о том, как объединить несколько сервисов и объединить ответы на пути.4 службы и их POJO-ответ похожи на следующий пример:

class Response1{
   String a1;
   String a2;
}

class Response2{
   String b1;
}

class Response3{
   String c1;
}

class Response4{
   String d1;
}

и сигнатура 4-х служб:

Flux<Response1> service1(); 
Flux<Response2> service2(String a1); //output field of Response1(service 1)
Flux<Response3> service3(String b1); //output field of Response2(service 2)
Mono<Response4> service4(String a2); //output field of Response1(service 1)

Так что service2 необходимо вызывать для каждого Response1 в Flux, иservice3 для каждого ответа2.Отношения между моделями:

Response1 <1-----*>Response2 (1 to many), 
Response2 <1-----*>Response3 (1 to many),
Response1 <1-----1>Response4 (1 to 1)

Совокупный окончательный ответ должен выглядеть следующим образом (JSON):

[
  {
    "a1": "",
    "a2": "",
    "d1": "",
    "response2s": [
      {
        "b1": "",
        "response3s": [
          {
            "c1": ""
          }
        ]
      }
    ]
  }
]

Итак, сначала мне нужно вызвать Service1, а затем вызвать service2 для каждого Response1, затем вызватьservice3 для каждого Response2 (возвращается service2).Кроме того, вызов service4 для каждого ответа1, возвращаемого service1 (может вызываться параллельно вызовам service2 и service3).Чтобы обновить Агрегированный окончательный ответ, я добавил два дополнительных POJO для хранения дочерних ответов, например (соответствующие биты):

public class AggResponse extends Response1{
    List<AggResponse2> response2s;// populated from service2 response
    String d1; // populated from service4 response

    public void add(AggResponse2 res2){
        if(response2s == null)
            response2s = new ArrayList<>();
        response2s.add(res2);
    }
}

и

public class AggResponse2 extends Response2{
    List<Response3> response3s;// populated from service3 response

    public void add(Response3 res3) {
        if (response3s == null)
            response3s = new ArrayList<>();
        response3s.add(res3);
    }
}

Как лучше всего сделать цепочкучтобы я сохранил предыдущие данные ответа и при объединении операторов сохранил все данные в объекте AggResponse?Я попытался сделать следующее:

public Flux<AggResponse> aggregate() {
    return services.service1()
            .map(res1 -> new AggResponse(res1.getA1(), res1.getA2()))
            .flatMap(aggRes -> services.service2(aggRes.getA1())
                    .map(res2 -> {
                        AggResponse2 aggRes2 = new AggResponse2(res2.getB1());
                        aggRes.add(aggRes2);
                        return aggRes2;
                    })
                    .flatMap(aggRes2 -> services.service3(aggRes2.getB1())
                            .map(res3 -> {
                                aggRes2.add(res3);
                                return res3;
                            })
                            .reduce(aggRes2, (a, aggRes3) -> aggRes2)
                    )
                    .reduce(aggRes, (a, aggRes2) -> aggRes)
            )
            .flatMap(aggRes -> services.service4(aggRes.getA1())
                    .map(res4 -> {
                        aggRes.setD1(res4.getD1());
                        return aggRes;
                    })
            );
}

, однако я получаю следующий неполный ответ:

[ {
  "a1" : "a1v1",
  "a2" : "a2v1"
} ]

Я вижу, что все службы вызываются при просмотре журналов.Два вопроса: 1. почему не видно агрегированного ответа, можно ли его уменьшить, потеряв его?2. есть ли лучший подход для достижения этого?

1 Ответ

0 голосов
/ 07 февраля 2019

Вы можете использовать метод merge, если не хотите ждать next сигнала *1003* для вашего service4.Примерно так:

return service1().flatMap(response1 ->
        Flux.merge(service23Agg(response1.a1), service4Agg(response1.a2))
                .reduce((aggResponse, aggResponse2) -> new AggResponse(
                        response1.a1,
                        response1.a2,
                        Optional.ofNullable(aggResponse.d1)
                                .orElse(aggResponse2.d1),
                        Optional.ofNullable(aggResponse.response2s)
                                .orElse(aggResponse2.response2s))));

Классы и методы служебных программ:

class AggContainer {
    final String b1;
    final List<Response3> response3s;

    AggContainer(String b1, List<Response3> response3s) {
        this.b1 = b1;
        this.response3s = response3s;
    }
}

class AggResponse {
    final String a1;
    final String a2;
    final String d1;
    final List<AggContainer> response2s;

    AggResponse(String a1, String a2, String d1, List<AggContainer> response2s) {
        this.a1 = a1;
        this.a2 = a2;
        this.d1 = d1;
        this.response2s = response2s;
    }

    AggResponse(String d1) {
        this.a1 = null;
        this.a2 = null;
        this.d1 = d1;
        this.response2s = null;
    }

    AggResponse(List<AggContainer> response2s) {
        this.a1 = null;
        this.a2 = null;
        this.d1 = null;
        this.response2s = response2s;
    }
}

private Mono<AggResponse> service23Agg(String a1) {
    return service2(a1).flatMap(response2 -> service3(response2.b1).collectList()
            .map(response3s -> new AggContainer(response2.b1, response3s)))
            .collectList()
            .map(AggResponse::new);
}

private Mono<AggResponse> service4Agg(String a2) {
    return service4(a2).map(response4 -> new AggResponse(response4.d1));
}

И вы должны быть очень осторожны с изменяемыми коллекциями в асинхронной среде.Избегайте менять его внутри реактивного трубопровода.

...