Реактор отложенной пагинации с расширением - PullRequest
0 голосов
/ 29 января 2019

На основе Как собирать API-ответы с разбивкой по страницам с помощью весенней загрузки WebClient?

Я создал следующий класс искателя

class GitlabCrawler(private val client: WebClient, private val token: String) {

    fun fetchCommits(project: URI): Flux<Commit> {
        return fetchCommitsInternal(project).expand { cr: ClientResponse? ->
                val nextUrl = getNextUrl(cr)

                nextUrl?.let { fetchCommitsInternal(URI.create(it)) }
                        ?: Mono.empty<ClientResponse>()
        }.limitRate(1)
                .flatMap { cr: ClientResponse? -> cr?.bodyToFlux(Commit::class.java) ?: Flux.empty() }


    }

    private fun getNextUrl(cr: ClientResponse?):String? {
        // TODO replace with proper link parsing
        return cr?.headers()?.header(HttpHeaders.LINK)?.firstOrNull()
                ?.splitToSequence(",")
                ?.find { it.endsWith("rel=\"next\"") }
                ?.let { it.substring(it.indexOf('<') + 1, it.lastIndexOf('>')) }
    }

    private fun fetchCommitsInternal(url: URI): Mono<ClientResponse> {
        return client.get()
                .uri(url)
                .accept(MediaType.APPLICATION_JSON_UTF8)
                .header("Private-Token", token)
                .exchange()
    }
}


data class Commit(
        val id: String,
        val message: String,
        @JsonProperty("parent_ids") val parentIds: List<String>,
        @JsonProperty("created_at") val createdAt: String)

Я бы хотел избежать ненужныхзапрос, но он выполняет больше запроса, чем необходимо для его выполнения.

gitlabCrawler.fetchCommits(URI.create("https://...")).take(15).collectList().block()

Потребуется только один запрос, поскольку каждая страница содержит 20 записей, но он запускает запрос второй страницы.Кажется, всегда запрашивать еще одну страницу, чем необходимо.Я пытался использовать limitRate, но это, похоже, не дает эффекта.

Есть ли способ сделать его ленивым, то есть запрашивать следующую страницу только тогда, когда ток исчерпан?

1 Ответ

0 голосов
/ 31 января 2019

Вы уверены, что он действительно выполняет запрос?fetchCommitInternal вызывается означает, что WebFlux «подготовил» запрос, не обязательно, чтобы он был выполнен (т.е. подписан).

Следующее упрощение вашего варианта использования показывает разницу:

private static Tuple2<Integer, Flux<Integer>> nextPage(int index, int pageSize) {
    System.out.println("prepared a request for page " + index);
    return Tuples.of(index, Flux.range((pageSize * (index - 1)) + 1, pageSize));
}

@Test
public void expandLimitedRequest() {
    int pageSize = 5;
    Flux.just(nextPage(1, pageSize))
        .doOnSubscribe(sub -> System.out.println("requested first page"))
        .expand(page -> {
            int currentPage = page.getT1();
            if (currentPage < 3) {
                int nextPage = currentPage + 1;
                return Flux.just(nextPage(nextPage, pageSize))
                           .doOnSubscribe(sub -> System.out.println("requested page " + nextPage));
            }
            return Flux.empty();
        })
        .doOnNext(System.out::println)
        .flatMap(Tuple2::getT2)
        .doOnNext(System.out::println)
        .take(8)
        .blockLast();
}

Отпечатки:

prepared a request for page 1
requested first page
[1,FluxRange]
1
2
3
4
5
prepared a request for page 2
requested page 2
[2,FluxRange]
6
7
8
prepared a request for page 3

Как видите, он подготавливает запрос для страницы 3, но никогда не выполняет его (потому что take нисходящий поток отменяет expand до этого).

...