Реакторные нумерованные веб-сервисы - PullRequest
0 голосов
/ 28 февраля 2019

Я использую реактор в весеннем проекте, где я должен называть paginated api.API возвращает что-то вроде этого:

{
  "last": false,
  "totalPages": 2,
  "totalElements": 4178,
  "sort": {
    "sorted": false,
    "unsorted": true
  },
  "first": false,
  "numberOfElements": 1178,
  "size": 3000,
  "number": 0
}

Теперь я пытаюсь достичь, используя webflux для вызова сервера до последнего == true.

Я не могу понять, каким будет правильный способ сделать это.

Что я получил так далеко, это:

Mono<UserInfo> firstUserInfo =  panelistService.getInactiveUserInfo(noOfDays, role, pageNo);


    Flux<User> listOfUsers = firstUserInfo.flatMap(fui ->{

        logger.info("ACCOUNT SERVICE - purgeCronJob - Getting first page of inactive panelists -  page {} total {} last {} panelists {}", pageNo,fui.getTotalNoOfPages(),fui.isLast(),fui.getUserContent().size()); 

        Mono<List<User>> firstListOfUsers = Mono.just(fui.getUserContent());

        if(fui.isLast()) {
            return firstListOfUsers;
        }

        pageNo++;
        int totalPageNo = fui.getTotalNoOfPages();

        for(int i = pageNo; i < totalPageNo; i++) {

            Mono<List<User>> lou = panelistService.getInactiveUserInfo(noOfDays, role, i).map(ui ->{
                logger.info("ACCOUNT SERVICE - purgeCronJob - Getting inactive panelists -  page {} total {} last {} panelists {}", pageNo,ui.getTotalNoOfPages(),ui.isLast(),ui.getUserContent().size()); 
                return ui.getUserContent();
            }); 
            firstListOfUsers.zipWith(lou);
        }

        return firstListOfUsers;

    }).flatMapMany(Flux::fromIterable);

    listOfUsers.subscribe();

Так что вместополучив последнее значение, я создаю все моно для каждой страницы и объединяю их все вместе.

Ответы [ 2 ]

0 голосов
/ 01 марта 2019

Вы могли бы создать поток в зависимости от количества totalPages, установив noOfDays и роль где-то снаружи, и получив totalSize из первого запроса:

Flux.range(0,totalSize)
   .map(pageNo -> panelistService.getInactiveUserInfo(noOfDays, role, pageNo)...
0 голосов
/ 01 марта 2019

Я нашел способ и надеюсь, что это поможет другим.Я создал метод, который получает неактивные учетные записи и рекурсивно вызвал этот метод.Разница в том, что теперь я возвращаю поток пользователей, и я Flux.merge мои рекурсивные вызовы.

Flux<User> getInactiveUserByPage(Integer noOfDays, String role, Integer pageNo){

    return  panelistService.getInactiveUserInfo(noOfDays, role, pageNo).flatMapMany(ui ->{

        logger.info("ACCOUNT SERVICE - getInactiveUserByPage - Getting inactive panelists -  page {} total {} last {} panelists {}", pageNo,ui.getTotalNoOfPages(),ui.isLast(),ui.getUserContent().size()); 

        Flux<User> users = Flux.fromIterable(ui.getUserContent());

        if(ui.isLast()) {
            return users;
        }

        Integer newPageNo  = pageNo+1;

        Flux<User> next =  getInactiveUserByPage(noOfDays, role, newPageNo).subscribeOn(Schedulers.elastic()).mergeWith(users);

        return next;
    });

}
...