Используйте BlockingQueue
с потоком, выполняющим BlockingQueue::poll(long timeout, TimeUnit unit)
с вычисленным временем ожидания, например, так, чтобы первый запрос ожидал не более некоторой фиксированной продолжительности.
Поток опроса будет собирать идентификаторы из очередив своем собственном списке, пока он не будет иметь m
идентификаторов или пока не будет достигнута максимальная продолжительность ожидания. Такой поток должен быть только один.
В приведенном выше списке должны быть записи, содержащие как идентификатор, так и CompletableFuture<R>
, который завершается с использованием результата вызова. Будущее - это то, что вы даете звонящему. Вместо списка вы можете использовать Map<String, CompletableFuture<R>>
, чтобы по завершении запроса вы могли легко завершить фьючерсы. На самом деле, очередь также должна содержать будущее, так что вы можете вернуть его вызывающей стороне.
Грубый набросок:
class ResourceMultigetter<R> {
private final BlockingQueue<Map.Entry<String, CompletableFuture<R>>> newEntries = ...;
private final Map<String, CompletableFuture<R>> collected = ...;
private long millisOfFirstWaitingRequest;
private volatile boolean stopped;
class Processor implements Runnable {
@Override
public void run() { // run by the polling thread
while (!stopped) {
final Map.Entry<String, CompletableFuture<R>> e = newEntries.poll(....);
if (e == null) {
if (!timeHasElapsed()) continue;
} else {
if (collected.isEmpty()) {
millisOfFirstWaitingRequest = System.currentTimeMillis();
}
collected.put(e.getKey(), e.getValue());
if (collected.size() < m && !timeHasElapsed()) continue;
}
final List<String> processedIds = callTheServer();
processedIds.forEach(id -> collected.remove(id));
}
}
}
public CompletableFuture<R> enqueue(String id) {
final CompletableFuture<R> result = new CompletableFuture<>();
newEntries.add(new AbstractMap.SimpleImmutableEntry<>(id, result));
return result;
}
}
Вы бы инициализировали его как
ResourceMultigetter resourceMultigetter = new ResourceMultigetter();
new Thread(resourceMultigetter.new Processor()).start();
Код клиента будет делать что-то вроде
R r = resourceMultigetter.enqueue(id); // this blocks