Я пытаюсь понять ReactiveX, используя RxJava, но не могу понять всю идею Reactive.Мой случай таков:
У меня Task
класс.У него есть метод perform()
, который выполняет HTTP-запрос и получает ответ через метод executeRequest()
.Запрос может быть выполнен много раз (определено количество повторений).Я хочу получить все результаты executeRequest()
и объединить их в Flowable
поток данных, чтобы я мог вернуть этот Flowable
в perform()
метод.Поэтому в конце я хочу, чтобы мой метод возвратил все результаты запросов, которые мой Task
выполнил.
executeRequest()
возвращает Single
, поскольку он выполняет только один запрос и может предоставить только один ответ или нет привсе (в случае тайм-аута).В perform()
я создаю Flowable
диапазон чисел для каждого повторения.Подписавшись на это Flowable
, я выполняю запрос за повторение.Я дополнительно подписываюсь на каждый ответ Single
для регистрации и сбора ответов в коллекции для последующего использования.Итак, теперь у меня есть набор Single
с, как я могу объединить их в Flowable
, чтобы вернуть его в perform()
?Я пытался возиться с такими операторами, как merge()
, но я не понимаю типы его параметров.
Я прочитал некоторые руководства в Интернете, но все они очень общие или не дают примеров в соответствии смой случай.
public Flowable<HttpClientResponse> perform() {
Long startTime = System.currentTimeMillis();
List<HttpClientResponse> responses = new ArrayList<>();
List<Long> failedRepetitionNumbers = new ArrayList<>();
Flowable.rangeLong(0, repetitions)
.subscribe(repetition -> {
logger.debug("Performing repetition {} of {}", repetition + 1, repetitions);
Long currentTime = System.currentTimeMillis();
if (durationCap == 0 || currentTime - startTime < durationCap) {
Single<HttpClientResponse> response = executeRequest(method, url, headers, body);
response.subscribe(successResult -> {
logger.info("Received response with code {} in the {}. repetition.", successResult
.statusCode(), repetition + 1);
responses.add(successResult);
},
error -> {
logger.error("Failed to receive response from {}.", url);
failedRepetitionNumbers.add(repetition);
});
waitInterval(minInterval, maxInterval);
} else {
logger.info("Reached duration cap of {}ms for task {}.", durationCap, this);
}
});
return Flowable.merge(???);
}
и executeRequest()
private Single<HttpClientResponse> executeRequest(HttpMethod method, String url, LinkedMultiValueMap<String, String>
headers, JsonNode body) {
CompletableFuture<HttpClientResponse> responseFuture = new CompletableFuture<>();
HttpClient client = vertx.createHttpClient();
HttpClientRequest request = client.request(method, url, responseFuture::complete);
headers.forEach(request::putHeader);
request.write(body.toString());
request.setTimeout(timeout);
request.end();
return Single.fromFuture(responseFuture);
}