Объединение множества потоков ReactiveX в один поток результатов - PullRequest
0 голосов
/ 08 июня 2018

Я пытаюсь понять ReactiveX, используя RxJava, но не могу понять всю идею Reactive.Мой случай таков:

У меня Task класс.У него есть метод perform(), который выполняет HTTP-запрос и получает ответ через метод executeRequest().Запрос может быть выполнен много раз (определено количество повторений).Я хочу получить все результаты executeRequest() и объединить их в Flowable поток данных, чтобы я мог вернуть этот Flowable в perform() метод.Поэтому в конце я хочу, чтобы мой метод возвратил все результаты запросов, которые мой Task выполнил.

executeRequest() возвращает Single, поскольку он выполняет только один запрос и может предоставить только один ответ или нет привсе (в случае тайм-аута).В perform() я создаю Flowable диапазон чисел для каждого повторения.Подписавшись на это Flowable, я выполняю запрос за повторение.Я дополнительно подписываюсь на каждый ответ Single для регистрации и сбора ответов в коллекции для последующего использования.Итак, теперь у меня есть набор Single с, как я могу объединить их в Flowable, чтобы вернуть его в perform()?Я пытался возиться с такими операторами, как merge(), но я не понимаю типы его параметров.

Я прочитал некоторые руководства в Интернете, но все они очень общие или не дают примеров в соответствии смой случай.

public Flowable<HttpClientResponse> perform() {

    Long startTime = System.currentTimeMillis();

    List<HttpClientResponse> responses = new ArrayList<>();
    List<Long> failedRepetitionNumbers = new ArrayList<>();

    Flowable.rangeLong(0, repetitions)
            .subscribe(repetition -> {
                logger.debug("Performing repetition {} of {}", repetition + 1, repetitions);

                Long currentTime = System.currentTimeMillis();

                if (durationCap == 0 || currentTime - startTime < durationCap) {

                    Single<HttpClientResponse> response = executeRequest(method, url, headers, body);

                    response.subscribe(successResult -> {
                                logger.info("Received response with code {} in the {}. repetition.", successResult
                                        .statusCode(), repetition + 1);
                                responses.add(successResult);
                            },
                            error -> {
                                logger.error("Failed to receive response from {}.", url);
                                failedRepetitionNumbers.add(repetition);
                            });
                    waitInterval(minInterval, maxInterval);
                } else {
                    logger.info("Reached duration cap of {}ms for task {}.", durationCap, this);
                }
            });

    return Flowable.merge(???);
}

и executeRequest()

private Single<HttpClientResponse> executeRequest(HttpMethod method, String url, LinkedMultiValueMap<String, String>
        headers, JsonNode body) {

    CompletableFuture<HttpClientResponse> responseFuture = new CompletableFuture<>();

    HttpClient client = vertx.createHttpClient();
    HttpClientRequest request = client.request(method, url, responseFuture::complete);
    headers.forEach(request::putHeader);
    request.write(body.toString());
    request.setTimeout(timeout);
    request.end();

    return Single.fromFuture(responseFuture);
}

1 Ответ

0 голосов
/ 08 июня 2018

Вместо подписки на каждое наблюдаемое (каждый HTTP-запрос) в вашем методе perform, просто продолжайте цеплять наблюдаемые, как это.Ваш код может быть уменьшен до чего-то вроде

...