Spring data couchbase, не может удалить документ по id с помощью реактивного программирования - PullRequest
0 голосов
/ 03 января 2019

Недавно мы решили использовать spring-webflux с couchbase в нашем проекте, и нам нужна помощь в том, как решить приведенный ниже сценарий использования в реактивном программировании

  1. Проверьте и сохраните запрос в базе данных Bucket1, (мы использовали javax.validation и подпружиненный ReactiveCouchbaseRepository.
  2. Позвоните во внешнюю службу (мы использовали веб-клиент для вызова API.

    • В случае успеха,

      • записать документ AUDIT в Bucket2.
      • Получить документ, который вставлен в Bucket1, и отправить его в ответ.
      • записать документ аудита в Bucket2
    • При сбое

      • записать документ AUDIT в Bucket2.
      • удалите документ, вставленный в BUCKET1, и сгенерируйте исключение.
      • записать документ аудита в Bucket2

Мы написали класс обслуживания и используем два класса хранилища для сохранения документов в couchbase и веб-клиент для вызова внешней службы.

Наша бизнес-логика метода класса обслуживания выглядит следующим образом.

    {

    //1. Validate the request and throw the error
    List<String> validationMessages = handler.validate(customerRequest);
    if (validationMessages != null && !validationMessages.isEmpty()) {
        return Mono.error(new InvalidRequestException("Invalid Request", validationMessages, null));
    }

    //generate the id, set it to the request and save it to BUCKET1
    String customerRequestId = sequenceGenerator.nextId(Sequence.CUSTOMER_ACCOUNT_ID);
    customerRequest.setcustomerRequestId(customerRequestId);
    customerRequestMono = bucket1Repository.save(customerRequest);


    //2. Call the external service using webclient
    externalServiceResponse = customerRequestWebClient.createCFEEnrollment(customerRequest);

    //2. Subscribe to the response and and on Success write audit to BUCKET2 , and onerror write audit to BUCKET2 , and delete the inserted documet from BUCKET1
    externalServiceResponse.subscribe(response -> {
        //Initialise the success audit bean and save
        //2.1 a) Write Audt to BUCKET2
        Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
         }, errorResp -> {
        //2.2 a) Write Audt to BUCKET2
        //Initialise the error audit bean and save
        Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);

        //2.2 b)Delete the inserted
        Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
    });

    //Get the loan account id and return the same
    finalResponse = bucket1Repository.findByCustomerId(customerId);
    return Mono.when(externalServiceResponse,customerRequestMono,finalResponse).then(finalResponse)
            .doOnSuccess(resp -> {
                try {
                    finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(resp));
                    Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }

            })
            .doOnError(error -> {
                try {
                    finalMasterAudit.setServiceResponse(new ObjectMapper().writeValueAsString(error.getMessage()));
                    Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(finalMasterAudit);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }

            });
}

Наблюдаемые нами проблемы пары

  1. Документ не сохраняется до тех пор, пока мы не подпишем его в некоторых случаях. Это ожидаемое поведение? Нужно ли нам подписываться на сохраняемый документ?
  2. Невозможно удалить документ в случае ошибки.
  3. Также я знаю, что не следовал приведенному выше чисто реактивному программированию. Помогите мне с любыми указателями эффективно написать код в реактив.

Пожалуйста, помогите нам с любыми указателями

1 Ответ

0 голосов
/ 03 января 2019

Взяв кусок кода выше:

externalServiceResponse.subscribe(response -> {
    Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
     }, errorResp -> {
    Mono<CustomerAuditBean> auditResponse = bucket2Repository.save(cfeAudit);
    Mono<CustomerRequest> delCustomer = bucket1Repository.deleteByLoanAccountId(loanAccountId);
});

С ним связаны две проблемы реактивного программирования:

  1. Вы создаете Monos, на которые вы не подписаны,таким образом, они никогда не будут выполняться.
  2. В любом случае вы не должны создавать их в подписке, а вместо этого использовать flatMap или onErrorResume, чтобы связать их, предварительно подписавшись.

Что-то вроде этого должносделайте трюк (извините, я не проверял его, поэтому вам может потребоваться внести некоторые изменения):

externalServiceResponse
   // If something goes wrong then delete the inserted doc
   .onErrorResume(err -> bucket1Repository.deleteByLoanAccountId(loanAccountId))

   // Always want to save the audit regardless
   .then(bucket2Repository.save(cfeAudit))

   .subscribe();

Есть другие проблемы, которые нужно исправить в коде, например, похоже, что вы хотите flatMapнесколько моно вместе, прежде чем подписаться на финальный моно, но, надеюсь, это поможет вам начать.

...