Я пишу асинхронный клиент. Каждый раз, когда отправляется запрос, клиент создает, регистрируется внутри, а затем возвращает CompletableFuture
, который будет завершен после получения ответа сервера на запрос отправки.
Проблема в том, что некоторые запросы никогда не получают ответ, и клиентский код отказывается от них после некоторого времени ожидания. И так, я получаю несколько зарегистрированных CompletableFuture
s, на которые клиентский код больше не содержит ссылки, и поэтому должен иметь право на сборку мусора.
Чтобы избежать утечек памяти, я решил изменить реализацию моего клиента, чтобы поддерживать WeakReference
s для возвращенных CompletableFuture
s вместо сильных, так, чтобы несвязанные фьючерсы были исключены, если код клиента они больше не нужны.
Следующий (упрощенный) код демонстрирует клиента:
interface Message {
Object getID();
byte[] toByteArray();
}
abstract class AsyncClient<T extends Message, R extends Message> {
/**
* This map serves as a registry of CompletableFutures for sent requests.
*
* Previously it was Map<Object, CompletableFuture<R>>. Changed to References
* to avoid CompletableFutures staying in registry after the recipient code
* does not wait for them anymore.
*/
Map<Object, Reference<CompletableFuture<R>>> requestFutureRegistry = new ConcurrentHashMap<>();
/**
* Write the request to the service
* @param serializedRequest bytes to write
*/
protected abstract void writeRequest(byte[] serializedRequest);
/**
* Make sure to clean up the registry from Futures that arte not held by the
* recipient code anymore.
*/
private void deleteDanglingFutures() {
requestFutureRegistry.entrySet().removeIf(entry -> entry.getValue().get() == null);
}
/**
*
* @param request to be written to service
* @return a CompletableFuture for the request's response
*/
public CompletableFuture<R> sendRequest(T request) {
deleteDanglingFutures();
CompletableFuture<R> future = new CompletableFuture<>();
requestFutureRegistry.put(request.getID(), new WeakReference<>(future));
writeRequest(request.toByteArray());
return future;
}
void receivedResponse(R response) {
deleteDanglingFutures();
Reference<CompletableFuture<R>> futureRef = requestFutureRegistry.get(response.getID());
if (futureRef != null) {
CompletableFuture<R> future = futureRef.get();
if (future != null) {
future.complete(response);
}
}
}
}
Этот подход, казалось, работал, пока я не начал связывать CompletableFuture
s, используя методы thenXXX. например, когда я делал такие вещи как:
AsyncClient client;
Request request1;
// ...
Response response = client.sendRequest(request1)
.thenCompose(response1 ->
client.sendRequest(request2)
).get(timeout, TimeUnit.MILLISECONDS);
Я заметил, что внутреннее будущее (от звонка до client.sendRequest(request2)
) получит иногда GC'd, прежде чем оно получит шанс быть завершенным.
Еще немного углубляясь в реализацию JDK, кажется, что не сохраняется сильной ссылки на внутреннее будущее (как объяснено в в этом посте ).
У меня вопрос: каков будет правильный способ гарантировать, что зарегистрированные фьючерсы будут удалены, если клиентский код больше не ссылается на них (напрямую или через цепочку thenXXX), и все же остается достаточно долго для завершения, если они все еще нужно?
РЕДАКТИРОВАТЬ + мотивация
Теперь я понимаю (спасибо @Holger), что Future::get
не является распространенным использованием CompletableFuture
s, однако в моем случае это очень распространено. В частности, один вариант использования, который мне нужно поддержать, - это отправка нескольких запросов в ожидании ответа первого (используя anyOf
, а затем get
с таймаутом). Заранее известно, что многие запросы никогда не получат ответа от службы, поэтому мне нужно (желательно автоматически) очистить неиспользуемые фьючерсы после тайм-аута.