Как объединить два, выполнить два потока реактора параллельно, которые возвращают списки и объединяют результаты - PullRequest
2 голосов
/ 16 июня 2020

У меня есть LegacyAccountDto, из которого мне нужно создать список из двух отдельных источников. Один - это локальный репозиторий JPA, а другой - вызов веб-службы. Версия веб-службы имеет доступный accountStatus, а источник данных JPA - нет. Мне нужно выполнить два вызова параллельно как Fluxes, а затем, когда они оба будут выполнены, мне нужно найти legacyId списка веб-сервисов и заполнить список значением accountStatus, полученным из веб-сервиса. Вся идея состоит в том, чтобы вернуть список с завершенным DTO. Мне не нужно сохранять его обратно в веб-службу или репозиторий JPA

DTO:

@Data
@JsonInclude(Include.NON_NULL)
public class LegacyAccountDto {
  private UUID id;
  private UUID organizationId;
  private String name;
  private String website;
  private Long legacyAccountId;
  private LocalDateTime legacyCreated;
  private String accountType;
  private String accountState;
}

Каждая функция в операторе слияния возвращает поток LegacyDTO

Flux<LegacyAccountDto> completed = Flux.merge(
      getLegacyAccountsFromSvc(accountNames),
      Flux.fromIterable(accountMapper.accountListToLegacyAccountDtoList(accountRepository.getAccountsByNames(accountNames).get()))
    )
    .parallel()
    .runOn(Schedulers.elastic())
    .???????((list1, list2) -> {
      list2.map(l2 -> {
        //find list1 by legacyId
        //set l2.accountStatus = l1.accountstatus
      })
      //return the completed list as a flux
    })

Я не уверен, какую функцию вызывать дальше, чтобы иметь доступ к обоим спискам и получить accountStatus из второго вызова и иметь возможность объединить его, чтобы он не возвращал тип параллельного потока, а чем просто поток LegacyDto

1 Ответ

2 голосов
/ 16 июня 2020

Вы можете сделать это так:

Mono<List<LegacyAccountDto>> firstMonoList = getLegacyAccountsFromSvc(accountNames).collectList();
Mono<List<EntityX> secondMonoList = accountMapper.accountListToLegacyAccountDtoList(accountRepository.getAccountsByNames(accountNames).get()).collectList();

Mono.zip(firstMonoList, secondMonoList)
.map(listTuple -> {
  // do the searching and map to list of dtos
  return resultingList;
})
.flatMapMany(Flux::fromIterable);

Не рекомендуется использовать блокировку вызовов базы данных в реактивном потоке. Если у вас есть возможность, вы можете добавить драйвер R2DBC и сделать все это реактивным.

...