Spring WebFlux - как получить данные из БД для использования на следующем шаге - PullRequest
2 голосов
/ 27 апреля 2020

Я использую Spring WebFlux (Project Reactor) и сталкиваюсь со следующей проблемой: мне нужно получить некоторые данные из db, чтобы использовать их для вызова другого сервиса - все в одном потоке. Как это сделать?

public Mono<MyObj> saveObj(Mono<MyObj> obj) {
    return obj
        .flatMap(
            ob->
                Mono.zip(
                        repo1.save(
                          ...),
                        repo2
                            .saveAll(...)
                            .collectList(),
                        repo3
                            .saveAll(...)
                            .collectList())
                    .map(this::createSpecificObject))
        .doOnNext(item-> createObjAndCallAnotherService(item));
  }



private void createObjAndCallAnotherService(Prot prot){
myRepository
        .findById(
            prot.getDomCred().stream()
                .filter(Objects::nonNull)
                .findFirst()
                .map(ConfDomCred::getCredId)
                .orElse(UUID.fromString("00000000-0000-0000-0000-000000000000")))
        .doOnNext( //one value is returned from myRepository -> Flux<MyObjectWithNeededData>
            confCred-> {//from this point the code is unreachable!!! - why????
              Optional<ConfDomCred> confDomCred=
                  prot.getDomCreds().stream().filter(Objects::nonNull).findFirst();

              confDomCred.ifPresent(
                  domCred -> {
                    ProtComDto com=
                        ProtComDto.builder()
                            .userName(confCred.getUsername())
                            .password(confCred.getPassword())                          
                            .build();
                    clientApiToAnotherService.callEndpintInAnotherService(com); //this is a client like Feign that invokes method in another service
                  });
            });
}

ОБНОВЛЕНИЕ

Когда я вызываю

    Flux<MyObj> myFlux =  myRepository
            .findById(
                prot.getDomCred().stream()
                    .filter(Objects::nonNull)
                    .findFirst()
                    .map(ConfDomCred::getCredId)
                    .orElse(UUID.fromString("00000000-0000-0000-0000-000000000000")));

myFlux.subscribe(e -> e.getPassword()) 

, тогда значение печатается

UPDATE2

Итак, подведем итоги - я думаю, что приведенный ниже код является асинхронным / неблокирующим - я прав? В моем

ProtectionCommandService

мне пришлось дважды использовать подписку () - только тогда я могу вызвать другой сервис и сохранить их мой объект: commandControllerApi.createNewCommand

public Mono<Protection> saveProtection(Mono<Protection> newProtection) {
    return newProtection.flatMap(
        protection ->
            Mono.zip(
                    protectorRepository.save(//some code),
                    domainCredentialRepository
                        .saveAll(//some code)
                        .collectList(),
                    protectionSetRepository
                        .saveAll(//some code)
                        .collectList())
                .map(this::createNewObjectWrapper)
                .doOnNext(protectionCommandService::createProtectionCommand));
  }

Класс ProtectionCommandService:

public class ProtectionCommandService {

  private final ProtectionCommandStrategyFactory protectionCommandFactory;
  private final CommandControllerApi commandControllerApi;

  public Mono<ProtectionObjectsWrapper> createProtectionCommand(
      ProtectionObjectsWrapper protection) {
    ProductType productType = protection.getProtector().getProductType();

    Optional<ProtectionCommandFactory> commandFactory = protectionCommandFactory.get(productType);

    commandFactory
        .get()
        .createCommandFromProtection(protection)
        .subscribe(command -> commandControllerApi.createNewCommand(command).subscribe());
    return Mono.just(protection);
  }
}

И одна из двух фабрик:

@Component
@AllArgsConstructor
@Slf4j
public class VmWareProtectionCommandFactory implements ProtectionCommandFactory {

  private static final Map<ProductType, CommandTypeEnum> productTypeToCommandType =
      ImmutableMap.of(...//some values);

  private final ConfigurationCredentialRepository configurationCredentialRepository;

  @Override
  public Mono<CommandDetails> createCommandFromProtection(ProtectionObjectsWrapper protection) {
    Optional<DomainCredential> domainCredential =
        protection.getDomainCredentials().stream().findFirst();

    return configurationCredentialRepository
        .findByOwnerAndId(protection.getOwner(), domainCredential.get().getCredentialId())
        .map(credential -> createCommand(protection, credential, domainCredential.get()));
  }

и метод createCommand возвращает объект Mono в результате этой фабрики.

private Mono<CommandDetails> createCommand(Protection protection
     //other parameters) {

    CommandDto commandDto =
        buildCommandDto(protection, confCredential, domainCredentials);

    String commands = JsonUtils.toJson(commandDto);
    CommandDetails details = new CommandDetails();
    details.setAgentId(protection.getProtector().getAgentId().toString());
    details.setCommandType(///some value);
    details.setArguments(//some value);
    return Mono.just(details);

UPDATE3

Мой основной метод, который вызывает все, немного изменился:

public Mono<MyObj> saveObj(Mono<MyObj> obj) {
    return obj
        .flatMap(
            ob->
                Mono.zip(
                        repo1.save(
                          ...),
                        repo2
                            .saveAll(...)
                            .collectList(),
                        repo3
                            .saveAll(...)
                            .collectList())
.map(this::wrapIntoAnotherObject)
.flatMap(protectionCommandService::createProtectionCommand)
.map(this::createMyObj));

1 Ответ

4 голосов
/ 01 мая 2020

Прекратить разрыв цепочки

Это чистая функция , она возвращает что-то и всегда возвращает то же самое, что бы мы ни давали. У него нет побочного эффекта.

public Mono<Integer> fooBar(int number) {
    return Mono.just(number);
}

мы можем вызвать его и включить, потому что он что-то возвращает.

foobar(5).flatMap(number -> { ... }).subscribe();

Это не чистая функция, мы не можем включить Мы разрываем цепь . Мы не можем подписаться, и ничего не произойдет, пока мы не подпишемся .

public void fooBar(int number) {
    Mono.just(number)
}

fooBar(5).subscribe(); // compiler error

, но я хочу функцию void, я хочу, я хочу, я хочу .... wuuaaa wuaaaa

Нам всегда нужно что-то возвращать, чтобы мы могли запустить следующую часть в цепочке. Как еще программа узнает, когда запустить следующий раздел? Но допустим, мы хотим проигнорировать возвращаемое значение и просто вызвать следующую часть. Ну, тогда мы можем вернуть Mono<Void>.

public Mono<Void> fooBar(int number) {
    System.out.println("Number: " + number);
    return Mono.empty();
}

foobar(5).subscribe(); // Will work we have not broken the chain

ваш пример:

private void createObjAndCallAnotherService(Prot prot){
    myRepository.findById( ... ) // breaking the chain, no return
}

И некоторые другие советы:

  • Правильно не называйте ваши объекты MyObj и saveObj, myRepository
  • Избегайте длинных имен createObjAndCallAnotherService
  • Следуйте одиночной ответственности createObjAndCallAnotherService это делает 2 вещи, отсюда и имя.
  • Создавайте приватные функции или вспомогательные функции, чтобы сделать ваш код более читабельным, не вставляйте все подряд.

ОБНОВЛЕНИЕ

Вы по-прежнему совершаете ту же ошибку.

commandFactory // Here you are breaking the chain because you are ignoring the return type
    .get()
    .createCommandFromProtection(protection)
    .subscribe(command -> commandControllerApi.createNewCommand(command)
.subscribe()); // DONT SUBSCRIBE you are not the consumer, the client that initiated the call is the subscriber
return Mono.just(protection);

Что вы хотите сделать:

return commandFactory.get()
    .createCommandFrom(protection)
    .flatMap(command -> commandControllerApi.createNewCommand(command))
    .thenReturn(protection);

Прекратите разрывать цепочку и не подписывайтесь, если ваша услуга не является конечным потребителем или инициатором вызова.

...