Управление памятью в цепочке CompletableFutures - PullRequest
0 голосов
/ 08 мая 2019

Я пишу асинхронный клиент. Каждый раз, когда отправляется запрос, клиент создает, регистрируется внутри, а затем возвращает CompletableFuture, который будет завершен после получения ответа сервера на запрос отправки.

Проблема в том, что некоторые запросы никогда не получают ответ, и клиентский код отказывается от них после некоторого времени ожидания. И так, я получаю несколько зарегистрированных CompletableFuture s, на которые клиентский код больше не содержит ссылки, и поэтому должен иметь право на сборку мусора.

Чтобы избежать утечек памяти, я решил изменить реализацию моего клиента, чтобы поддерживать WeakReference s для возвращенных CompletableFuture s вместо сильных, так, чтобы несвязанные фьючерсы были исключены, если код клиента они больше не нужны.

Следующий (упрощенный) код демонстрирует клиента:

interface Message {
    Object getID();
    byte[] toByteArray();
}

abstract class AsyncClient<T extends Message, R extends Message> {
    /**
     * This map serves as a registry of CompletableFutures for sent requests.
     * 
     * Previously it was Map<Object, CompletableFuture<R>>. Changed to References
     * to avoid CompletableFutures staying in registry after the recipient code
     * does not wait for them anymore.
     */
    Map<Object, Reference<CompletableFuture<R>>> requestFutureRegistry = new ConcurrentHashMap<>();

    /**
     * Write the request to the service
     * @param serializedRequest bytes to write
     */
    protected abstract void writeRequest(byte[] serializedRequest);

    /**
     * Make sure to clean up the registry from Futures that arte not held by the 
     * recipient code anymore.
     */
    private void deleteDanglingFutures() {
        requestFutureRegistry.entrySet().removeIf(entry -> entry.getValue().get() == null);
    }

    /**
     * 
     * @param request to be written to service
     * @return a CompletableFuture for the request's response
     */
    public CompletableFuture<R> sendRequest(T request) {
        deleteDanglingFutures();

        CompletableFuture<R> future = new CompletableFuture<>();
        requestFutureRegistry.put(request.getID(), new WeakReference<>(future));

        writeRequest(request.toByteArray());

        return future;
    }

    void receivedResponse(R response) {
        deleteDanglingFutures();

        Reference<CompletableFuture<R>> futureRef = requestFutureRegistry.get(response.getID());
        if (futureRef != null) {
            CompletableFuture<R> future = futureRef.get();
            if (future != null) {
                future.complete(response);
            }
        }
    }
}

Этот подход, казалось, работал, пока я не начал связывать CompletableFuture s, используя методы thenXXX. например, когда я делал такие вещи как:

AsyncClient client;
Request request1;
// ...

Response response = client.sendRequest(request1)
                          .thenCompose(response1 -> 
                                       client.sendRequest(request2)
                           ).get(timeout, TimeUnit.MILLISECONDS);

Я заметил, что внутреннее будущее (от звонка до client.sendRequest(request2)) получит иногда GC'd, прежде чем оно получит шанс быть завершенным.

Еще немного углубляясь в реализацию JDK, кажется, что не сохраняется сильной ссылки на внутреннее будущее (как объяснено в в этом посте ).

У меня вопрос: каков будет правильный способ гарантировать, что зарегистрированные фьючерсы будут удалены, если клиентский код больше не ссылается на них (напрямую или через цепочку thenXXX), и все же остается достаточно долго для завершения, если они все еще нужно?

РЕДАКТИРОВАТЬ + мотивация

Теперь я понимаю (спасибо @Holger), что Future::get не является распространенным использованием CompletableFuture s, однако в моем случае это очень распространено. В частности, один вариант использования, который мне нужно поддержать, - это отправка нескольких запросов в ожидании ответа первого (используя anyOf, а затем get с таймаутом). Заранее известно, что многие запросы никогда не получат ответа от службы, поэтому мне нужно (желательно автоматически) очистить неиспользуемые фьючерсы после тайм-аута.

...