Я возвращаюсь на Java через несколько лет и с нетерпением ожидаю появления неблокирующей асинхронной поддержки в новом java.net.http.HttpClient и в AWS Java SDK 2.0 . Я слышал о концепциях Реактивного Программирования много лет назад в выступлениях на конференциях, но у меня не было большого шанса применить эти идеи на практике.
У меня есть проблема, которая, кажется, хорошо подходит для игры с этим стилем программирования: в основном я хочу скачать кучу файлов (скажем, 10 000) по HTTP и записать их обратно на S3.
Я использовал failsafe для реализации повторов для неблокирующих асинхронных http GET, и легко составить те, которые загружаются через асинхронный клиент S3 (см. Рисунок ниже).
Однако я не уверен, как правильно ограничить использование памяти программой: не существует механизма для применения противодавления и предотвращения исключения нехватки памяти, если файлы загружаются быстрее, чем записываются обратно в S3.
Я знаком с некоторыми традиционными блокирующими решениями этой проблемы - например, используйте семафор, чтобы ограничить число одновременных загрузок, или запишите загрузки в некоторую ограниченную очередь блокировки, из которой будут извлекаться потоки загрузки S3. Однако, если я собираюсь использовать такой механизм блокировки для применения противодавления, то это заставляет меня усомниться в преимуществе использования неблокирующего ввода-вывода.
Есть ли более идиоматический "реактивный" способ достижения той же цели?
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class BackupClient {
private static final Logger LOGGER = LoggerFactory.getLogger(BackupClient.class);
private final HttpClient httpClient = HttpClient.newBuilder().build();
private final S3AsyncClient s3AsyncClient = S3AsyncClient.create();
public runBackup(List<URI> filesToBackup) {
List<CompletableFuture<PutObjectResponse>> futures = filesToBackup.stream()
.map(backupClient::submitBackup)
.collect(Collectors.toList());
futures.forEach(CompletableFuture::join);
}
private CompletableFuture<PutObjectResponse> submitBackup(URI uri) {
return sendAsyncWithRetries(uri, HttpResponse.BodyHandlers.ofString())
.thenCompose(httpResponse -> s3AsyncClient.putObject(PutObjectRequest.builder()
.bucket("my-bucket")
.key(uri.toASCIIString())
.build(), AsyncRequestBody.fromString(httpResponse.body())));
}
private <T> CompletableFuture<HttpResponse<T>> sendAsyncWithRetries(URI uri, HttpResponse.BodyHandler<T> handler) {
final HttpRequest request = HttpRequest.newBuilder()
.uri(uri)
.timeout(Duration.ofMinutes(2))
.GET()
.build();
final var retryPolicy = new RetryPolicy<HttpResponse<T>>()
.withMaxRetries(4)
.withDelay(Duration.ofSeconds(1))
.handleResultIf(response -> 200 != response.statusCode());
return Failsafe.with(retryPolicy)
.getStageAsync(context -> {
if (context.getAttemptCount() > 0) {
LOGGER.error("Retry " + context.getAttemptCount() + " for " + uri);
}
return this.httpClient.sendAsync(request, handler);
});
}
}