N + 1 пакетирование HTTP-вызовов с одновременной очередью - PullRequest
1 голос
/ 20 ноября 2019

Предположим, у меня есть метод, который извлекает данные из HTTP API

public R getResource(String id){
      //HTTP call to 
      return fetch("http://example.com/api/id")
}

Но http://example.org/api/ поддерживает несколько идентификаторов одновременно, скажем http://example.org/api/id1,id2,id3

Вмногопоточная среда, я хочу блокировать, пока я не соберу m идентификаторов, а затем одним выстрелом получу данные из API.

Кроме того, чтобы избежать бесконечных / длинных блоков, должно быть ожиданиеtimeout.

для m = 5 Допустим, 20 потоков одновременно прибывают для вызова этого метода, затем необходимо отправить 4 пакета запросов в API HTTP.

Любое предложение по реализации или существующие платформы для поддержкиэтот пакет.

Редактировать предложения приветствуются.

1 Ответ

0 голосов
/ 21 ноября 2019

Используйте BlockingQueue с потоком, выполняющим BlockingQueue::poll(long timeout, TimeUnit unit) с вычисленным временем ожидания, например, так, чтобы первый запрос ожидал не более некоторой фиксированной продолжительности.

Поток опроса будет собирать идентификаторы из очередив своем собственном списке, пока он не будет иметь m идентификаторов или пока не будет достигнута максимальная продолжительность ожидания. Такой поток должен быть только один.

В приведенном выше списке должны быть записи, содержащие как идентификатор, так и CompletableFuture<R>, который завершается с использованием результата вызова. Будущее - это то, что вы даете звонящему. Вместо списка вы можете использовать Map<String, CompletableFuture<R>>, чтобы по завершении запроса вы могли легко завершить фьючерсы. На самом деле, очередь также должна содержать будущее, так что вы можете вернуть его вызывающей стороне.

Грубый набросок:

class ResourceMultigetter<R> {
    private final BlockingQueue<Map.Entry<String, CompletableFuture<R>>> newEntries = ...;

    private final Map<String, CompletableFuture<R>> collected = ...;

    private long millisOfFirstWaitingRequest;

    private volatile boolean stopped;

    class Processor implements Runnable {
        @Override
        public void run() { // run by the polling thread
            while (!stopped) {
                final Map.Entry<String, CompletableFuture<R>> e = newEntries.poll(....);
                if (e == null) {
                    if (!timeHasElapsed()) continue;
                } else {
                    if (collected.isEmpty()) {
                        millisOfFirstWaitingRequest = System.currentTimeMillis();
                    }
                    collected.put(e.getKey(), e.getValue());
                    if (collected.size() < m && !timeHasElapsed()) continue;

                }
                final List<String> processedIds = callTheServer();
                processedIds.forEach(id -> collected.remove(id));
            }
        }
    }

    public CompletableFuture<R> enqueue(String id) {
        final CompletableFuture<R> result = new CompletableFuture<>();
        newEntries.add(new AbstractMap.SimpleImmutableEntry<>(id, result));
        return result;
    }
}

Вы бы инициализировали его как

ResourceMultigetter resourceMultigetter = new ResourceMultigetter();
new Thread(resourceMultigetter.new Processor()).start();

Код клиента будет делать что-то вроде

R r = resourceMultigetter.enqueue(id); // this blocks
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...