Неблокирующая асинхронная обработка Java - как ограничить использование памяти? - PullRequest
3 голосов
/ 12 июня 2019

Я возвращаюсь на Java через несколько лет и с нетерпением ожидаю появления неблокирующей асинхронной поддержки в новом java.net.http.HttpClient и в AWS Java SDK 2.0 . Я слышал о концепциях Реактивного Программирования много лет назад в выступлениях на конференциях, но у меня не было большого шанса применить эти идеи на практике.

У меня есть проблема, которая, кажется, хорошо подходит для игры с этим стилем программирования: в основном я хочу скачать кучу файлов (скажем, 10 000) по HTTP и записать их обратно на S3.

Я использовал failsafe для реализации повторов для неблокирующих асинхронных http GET, и легко составить те, которые загружаются через асинхронный клиент S3 (см. Рисунок ниже).

Однако я не уверен, как правильно ограничить использование памяти программой: не существует механизма для применения противодавления и предотвращения исключения нехватки памяти, если файлы загружаются быстрее, чем записываются обратно в S3.

Я знаком с некоторыми традиционными блокирующими решениями этой проблемы - например, используйте семафор, чтобы ограничить число одновременных загрузок, или запишите загрузки в некоторую ограниченную очередь блокировки, из которой будут извлекаться потоки загрузки S3. Однако, если я собираюсь использовать такой механизм блокировки для применения противодавления, то это заставляет меня усомниться в преимуществе использования неблокирующего ввода-вывода.

Есть ли более идиоматический "реактивный" способ достижения той же цели?

import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class BackupClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(BackupClient.class);
    private final HttpClient httpClient = HttpClient.newBuilder().build();
    private final S3AsyncClient s3AsyncClient = S3AsyncClient.create();

    public runBackup(List<URI> filesToBackup) {
        List<CompletableFuture<PutObjectResponse>> futures = filesToBackup.stream()
                .map(backupClient::submitBackup)
                .collect(Collectors.toList());

        futures.forEach(CompletableFuture::join);
    }

    private CompletableFuture<PutObjectResponse> submitBackup(URI uri) {
        return sendAsyncWithRetries(uri, HttpResponse.BodyHandlers.ofString())
                .thenCompose(httpResponse -> s3AsyncClient.putObject(PutObjectRequest.builder()
                        .bucket("my-bucket")
                        .key(uri.toASCIIString())
                        .build(), AsyncRequestBody.fromString(httpResponse.body())));
    }


    private <T> CompletableFuture<HttpResponse<T>> sendAsyncWithRetries(URI uri, HttpResponse.BodyHandler<T> handler) {
        final HttpRequest request = HttpRequest.newBuilder()
                .uri(uri)
                .timeout(Duration.ofMinutes(2))
                .GET()
                .build();

        final var retryPolicy = new RetryPolicy<HttpResponse<T>>()
                .withMaxRetries(4)
                .withDelay(Duration.ofSeconds(1))
                .handleResultIf(response -> 200 != response.statusCode());

        return Failsafe.with(retryPolicy)
                .getStageAsync(context -> {
                    if (context.getAttemptCount() > 0) {
                        LOGGER.error("Retry " + context.getAttemptCount() + " for " + uri);
                    }
                    return this.httpClient.sendAsync(request, handler);
                });
    }
}

1 Ответ

3 голосов
/ 13 июня 2019

Так как вам нужно контролировать потребление ресурсов (памяти), то Semaphore - правильный инструмент для этой цели.А поскольку вы хотите использовать неблокирующие вычисления, все, что вам нужно, это асинхронный семафор.Популярные библиотеки (rxjava, реактивные потоки) используют внутренний асинхронный семафор для создания реактивных потоков, но не предлагают его в качестве отдельного класса.Когда абонент реактивного потока вызывает Flow.Subscription.request (n) , это эквивалентно Semaphore.release (n) .Однако аналог Semaphore.acquire () скрыт.Он вызывается издателем изнутри.

Недостатком такого дизайнерского решения является то, что обратная связь по ресурсам может быть установлена ​​только между производителем и его ближайшим потребителем.Если существует цепочка производителей и потребителей, то потребление ресурсов каждой ссылки должно контролироваться отдельно, и общее потребление ресурсов становится в N раз больше, где N - количество ссылок.

Если вы можете себе это позволить, тогда вы можете использовать rxjava или любую другую реализацию библиотеки реактивных потоков.Если нет, то вы должны использовать единственную асинхронную библиотеку, которая позволяет пользователю полностью получить доступ к реализации асинхронного семафора : DF4J (да, я автор).Он не содержит прямого решения вашей проблемы, но содержит пример, в котором асинхронный сетевой сервер ограничивает количество одновременных соединений с помощью асинхронного семафора, см. ConnectionManager.java .

...