Java "в порядке" семафор - PullRequest
       61

Java "в порядке" семафор

0 голосов
/ 02 ноября 2018

У меня есть проблема и смутное представление о том, как ее исправить, но я постараюсь переоценить контекст, чтобы избежать проблемы XY .

У меня есть асинхронный метод, который немедленно возвращает гуаву ListenableFuture, которую мне нужно вызывать сотни тысяч или миллионы раз (самому будущему может потребоваться некоторое время для завершения). Я не могу действительно изменить внутренности этого метода. Это связано с серьезным конфликтом ресурсов, поэтому я бы хотел ограничить количество обращений к этому методу за один раз. Поэтому я попытался использовать Semaphore:

public class ConcurrentCallsLimiter<In, Out> {
  private final Function<In, ListenableFuture<Out>> fn;
  private final Semaphore semaphore;
  private final Executor releasingExecutor;

  public ConcurrentCallsLimiter(
      int limit, Executor releasingExecutor,
      Function<In, ListenableFuture<Out>> fn) {
    this.semaphore = new Semaphore(limit);
    this.fn = fn;
    this.releasingExecutor = releasingExecutor;
  }

  public ListenableFuture<Out> apply(In in) throws InterruptedException {
    semaphore.acquire();
    ListenableFuture<Out> result = fn.apply(in);
    result.addListener(() -> semaphore.release(), releasingExecutor);
    return result;
  }
}

Итак, я могу просто обернуть свой вызов в этом классе и вместо этого вызвать:

ConcurrentLimiter<Foo, Bar> cl =
    new ConcurrentLimiter(10, executor, someService::turnFooIntoBar);
for (Foo foo : foos) {
  ListenableFuture<Bar> bar = cl.apply(foo);
  // handle bar (no pun intended)
}

Этот работает . Проблема в том, что задержка хвоста действительно плохая. Некоторые вызовы становятся «неудачными» и в конечном итоге занимают много времени, пытаясь получить ресурсы в рамках этого вызова метода. Это усугубляется некоторой внутренней логикой экспоненциального отката, так что неудачный вызов получает все меньше и меньше шансов на получение необходимых ресурсов по сравнению с новыми вызовами, которые более нетерпеливы и ждут гораздо меньше времени, прежде чем пытаться снова.

Что было бы идеально, чтобы исправить это, если бы было что-то похожее на этот семафор, но у которого было понятие порядка. Например, если лимит равен 10, то 11-й вызов в настоящее время должен ждать любого из первых 10, чтобы завершиться. Мне бы хотелось, чтобы 11-й звонок ожидал завершения самого первого первого вызова. Таким образом, «несчастливые» звонки не будут истощаться новыми поступающими звонками.

Кажется, что я мог бы назначить целочисленный порядковый номер для вызовов и каким-то образом отследить самый низкий из них, который еще не завершен, но я не мог понять, как это сделать, тем более что на самом деле нет никакого полезного "ожидания" методы на AtomicInteger или как угодно.

Ответы [ 3 ]

0 голосов
/ 02 ноября 2018

Одно нереальное решение, о котором я думал, это использование Queue для хранения всех фьючерсов:

public class ConcurrentCallsLimiter<In, Out> {
  private final Function<In, ListenableFuture<Out>> fn;
  private final int limit;
  private final Queue<ListenableFuture<Out>> queue = new ArrayDeque<>();

  public ConcurrentCallsLimiter(int limit, Function<In, ListenableFuture<Out>> fn) {
    this.limit = limit;
    this.fn = fn;
  }

  public ListenableFuture<Out> apply(In in) throws InterruptedException {
    if (queue.size() == limit) {
      queue.remove().get();
    }
    ListenableFuture<Out> result = fn.apply(in);
    queue.add(result);
    return result;
  }
}

Однако это кажется большой тратой памяти. Вовлеченные объекты могут быть немного большими, и предел может быть установлен довольно высоко. Так что я открыт для лучших ответов, которые не используют O (n) памяти. Кажется, что это должно быть выполнимо в O (1).

0 голосов
/ 04 ноября 2018

«если было что-то похожее на этот семафор, но в нем было понятие порядка». Существует такой зверь. Это называется Blocking queue. Создайте такую ​​очередь, поместите туда 10 элементов и используйте take вместо acquire и put вместо release.

0 голосов
/ 02 ноября 2018

Вы можете создать семафор с параметром справедливости:

Semaphore(int permits, boolean fair)

// справедливо - истина, если этот семафор будет гарантировать первоочередное выдачу разрешений при споре, иначе ложь

...