Последовательные асинхронные вызовы, основанные на проверках состояния в реакторе - PullRequest
0 голосов
/ 26 октября 2018

Здесь я пытаюсь выполнять асинхронные и неблокирующие вызовы с использованием реактора, и для каждого запроса мне, возможно, придется вызывать две службы последовательно (в моем случае ниже, getAccountInfoFromAAA и getAccountInfoFromBBB).

Вот мой ItemRequest объект:

public class ItemRequest {
    private Account account;
    private Result firstServiceResult;
    private Result secondServiceResult;
    private PostingParameterCode postingParameterCode; //enum 
    //...
    //...
    //getters and setters
}

Итак, мой запрос будет содержать несколько itemRequest с, и для каждого itemRequest я выполняю асинхронные вызовы как:

public void getAccountData(List<ItemRequest> itemRequests) {
    ImmutableList<ItemRequest> list = ImmutableList.copyOf(itemRequests);
    Flux.fromIterable(list).flatMap(this::callBothSors).blockLast();
}

public Mono<ItemRequest> callBothSors(ItemRequest itemRequest) {
    return getAccountDataService.getAccountDataFromAAAandBBB(itemRequest); 
    //here, it will enter into a sequential call for each itemRequest
}

Это мой первый интерфейс служебного вызова:

public Mono<ItemRequest> getAccountDataFromAAA(ItemRequest itemRequest);

Это мой второй интерфейс служебного вызова:

public Mono<ItemRequest> getAccountDataFromBBB(ItemRequest itemRequest);

Этот метод будет иметь до двух последовательных вызовов на основеусловие:

public Mono<ItemRequest> getAccountDataFromAAAandBBB(ItemRequest itemRequest){
    Mono<ItemRequest> firstCallResult = Mono.empty();
    Mono<ItemRequest> secondCallResult = Mono.empty();

if(isFirstServiceCallRequired(itemRequest)){
    firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest); 
//basically, firstService call will update the accountKey information and
//will also set the result status to OK which is required to decide 
//whether to make secondService call.
} else {
    //Account key is already present, so just update the result status which I need later.
    Result result = new Result();
    result.setStatus(Result.Status.OK);
    result.setMessageText("First call not required as account info is set for item request");
    itemRequest.setFirstServiceResult(result);
}

//Now, before calling the second service, I need to check the following:

if(null!= itemRequest.getFirstServiceResult() && 
    itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) && 
    itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT)){ 
        secondCallResult = this.secondServiceCallImpl.getAccountDataFromBBB(itemRequest);
    }

    return firstCallResult.then(secondCallResult);  //attaching the
    //firstCallResult and secondCallResult to produce a single Mono

}

Работает нормально, когда firstCallResult не требуется.Но когда требуется первый вызов, эта проверка состояния не пройдет, так как у меня не будет обновлен объект результата первого вызова:

if(null != itemRequest.getFirstServiceResult() && 
    itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) && 
    itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT))) { ... } 
 //this condition check will not pass because first service call is not actually executing

Оба случая работают нормально, если я добавлю следующее утверждение:

if(isFirstServiceCallRequired(itemRequest)){
        firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest); 
        firstCallResult.block(); //adding this case will work on both cases 
    }

Но я не думаю, что получу выгоду от реакторов таким образом.Я думал о такой логике:

Mono<ItemRequest> result = firstService.call(...)
    .doOnNext(/*do something */)
    .then( ... secondService.call())

Но я не мог найти способ связать secondService с firstService, чтобы получить моно-результат и те же проверки условий.Проверка состояния важна, так как я не всегда хочу выполнить вторую услугу.Есть ли способ связать secondService с firstService для получения результата и проверки этих условий?

Извинения за длинный вопрос.Любые предложения / помощь будет принята с благодарностью.

Ответы [ 3 ]

0 голосов
/ 02 ноября 2018

После предложения щедрых баллов на этот вопрос я был действительно взволнован и ожидал некоторых ответов.Но в любом случае, я могу улучшить свое первоначальное решение, и у меня тоже есть эти проверки условий.

Я сделал следующее: я изменил тип возврата с Mono<ItemRequest> на Mono<Void> в обоих вызовах службы, так как я в основномобновление данных до ItemRequest список:

Обработка параллельного вызова здесь (каждый параллельный вызов имеет последовательный вызов):

public void getAccountData(List<ItemRequest> itemRequests) {
        ImmutableList<ItemRequest> list = ImmutableList.copyOf(itemRequests);
        Flux.fromIterable(list).flatMap(this::callBothSors).blockLast();
    }

    public Mono<Void> callBothSors(ItemRequest itemRequest) {
        return getAccountDataService.getAccountDataFromAAAandBBB(itemRequest);
        //here, it will enter into a sequential call for each itemRequest
    }

, и это мои firstServiceCall иsecondServiceCall изменения интерфейса:

public Mono<Void> getAccountDataFromAAA(ItemRequest itemRequest);

public Mono<Void> getAccountDataFromBBB(ItemRequest itemRequest);

и я связал secondServiceCall с firstServiceCall, чтобы получить моно-результат и получить эти проверки условий также как:

public Mono<Void> getAccountDataFromAAAandBBB(ItemRequest itemRequest){
    Mono<Void> callSequence = Mono.empty();

    if(isFirstServiceCallRequired(itemRequest)){
        callSequence = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
    } else {
        //Account key is already present, so just update the result status which I need later.
        Result result = new Result();
        result.setStatus(Result.Status.OK);
        result.setMessageText("First call not required as account info is set for item request");
        itemRequest.setFirstServiceResult(result);
    }

    return callSequence.thenEmpty(Mono.defer(() -> {
        //note: Mono.defer ==>> Create a Mono provider that will supply a target Mono to subscribe to 
        //for each subscriber downstream.
        //only if the firstServiceCall result is successful & other condition check successful,
        // I am calling secondServiceCall:  
        if(shouldCallSecondService(itemRequest)){
            return this.secondServiceCallImpl.getAccountDataFromAAAandBBB(itemRequest);
        } else {
            return Mono.empty();
        }
    }))
0 голосов
/ 05 ноября 2018
public Mono<ItemRequest> getAccountDataFromAAAandBBB(ItemRequest itemRequest) {
  Mono<ItemRequest> firstCallResult = Mono.empty();
  Mono<ItemRequest> secondCallResult = Mono.empty();

  if (isFirstServiceCallRequired(itemRequest)) {
    firstCallResult = this.firstServiceCallImpl.getAccountDataFromAAA(itemRequest);
    //basically, firstService call will update the accountKey information and
    //will also set the result status to OK which is required to decide
    //whether to make secondService call.
  } else {
  /*Account key is already present, so just update the result status which I need 
  later.*/
    firstCallResult = Mono.defer(() -> {
      Result result = new Result();
      result.setStatus(Result.Status.OK);
      result.setMessageText("First call not required as account info is set for item request");
      itemRequest.setFirstServiceResult(result);
      return Mono.just(itemRequest);
    });
  }

  return firstCallResult.flatMap(itReq -> {
    //Now, before calling the second service, I need to check the following:
    if (null != itemRequest.getFirstServiceResult() &&
        itemRequest.getFirstServiceResult().getStatus().equals(Result.Status.OK) &&
      itemRequest.getPostingParameterCode().equals(PostingParameterCode.MOBILECREDIT)) {
        return secondCallResult = this.secondServiceCallImpl.getAccountDataFromBBB(itemRequest);
  } else {
    return itReq;
  }
  });
}

Следующий простой пример поможет вам с flatMap пониманием:

public static void main(String[] args) {

  callExternalServiceA.flatMap(response -> {
    if(response.equals("200")){
      return Mono.just(response);
    } else {
      return callExtertnalServiceB();
    }
  }).block();

}

public static Mono<String> callExtertnalServiceA() {
  return Mono.defer(() -> {
    System.out.println("Call external service A");
    return Mono.just("400");
  });
}

public static Mono<String> callExtertnalServiceB() {
  return Mono.defer(() -> {
    System.out.println("Call external service B");
    return Mono.just("200");
  });
}
0 голосов
/ 01 ноября 2018

Вот несколько новостей: Реактор - это не серебряная пуля! :)

Всякий раз, когда вам нужен ответ на звонок, чтобы определить, нужно ли вам делать что-то еще, это никогда не сможет быть полностью распараллелено. Например. Вы всегда можете сделать последнее предложение. Однако это не означает, что использование Reactor не дает вам никаких преимуществ!

Некоторые из преимуществ, которые вы получаете:

  • Вы используете Netty под капотом вместо сервлета, что помогает избежать блокировки операций ввода-вывода. Это может привести к лучшему распределению ресурсов, делая вашу систему более устойчивой.
  • Вы можете выполнять другие операции в ожидании ответа. Если у вас есть дела, где порядок не имеет значения, вы всегда можете поместить их туда (например, аудит, ведение журнала и т. Д.).

Надеюсь, это ответит на ваш вопрос:)

...