Минимальное время блокировки в Java - PullRequest
2 голосов
/ 30 сентября 2011

Я ищу способ ограничить доступ к методу в Java не более одного раза в X секунд.Вот моя ситуация:

Я хочу запустить этот код параллельно с несколькими потоками:

private MyService service;
public void run() {
    // Send request to remote service
    InputStream response = service.executeRequest();

    // Process response
    ... some more code
}

Метод executeRequest () отправляет запрос http на удаленный сервер (который не является моим)У меня нет доступа к его реализации) и жду ответа от сервера.Затем он выполняет некоторую обработку данных.Я хотел бы, чтобы многие потоки запускали это параллельно.Моя проблема в том, что удаленный сервер зависнет, если одновременно будет отправлено слишком много запросов.Поэтому я хочу как-то убедиться, что метод executeRequest () никогда не будет вызываться чаще, чем раз в секунду.

Знаете ли вы, как я мог бы сделать это в Java?Спасибо

Ответы [ 6 ]

4 голосов
/ 30 сентября 2011

Хм, я не уверен, что ограничение частоты доступа к методу приведет к предотвращению перегрузки.

Возможно, в вышеприведенном посте недостаточно информации, но кажется, что настройка WorkerThread + JobQueue здесь бы отлично работала.

Пища для размышлений: Менеджер многопоточных очередей заданий

РЕДАКТИРОВАТЬ : Попытка быть немного менее расплывчатой ​​...

  • Пусть сервер соберет запросы в некоторую структуру данных, возможно, в классназывается работа.
  • Пусть Джобс будет помещен в конец очереди.
  • Пусть объекты WorkerThread выталкивают объекты Job из верхней части очереди и обрабатывают их.
  • Удостоверьтесь, что вы создали только столько объектов WorkerThread, сколько вам необходимо для поддержания правильной нагрузки на сервер.Только эксперименты определят это число, но, как очень грубое правило, начните с количества процессорных ядер - 1. (иначе 7 рабочих на 8-ядерном компьютере)

РЕДАКТИРОВАТЬ # 2 В свете новой информации:

  • Настройка очереди на стороне клиента
  • Создание работника, который может отслеживать, какие задания были отправлены, какие задания получили ответ и какиеРабота еще обрабатывается.Это позволит ограничить количество заданий, отправляемых за один раз.
  • Сделайте так, чтобы Worker отслеживал 'lastSubmissionTime', чтобы предотвратить любую отправку <1 секунды с предыдущей </li>
2 голосов
/ 30 сентября 2011

Вы можете использовать семафор для регулирования количества потоков, способных вызвать executeRequest ():

http://download.oracle.com/javase/1,5,0/docs/api/java/util/concurrent/Semaphore.html

Выполняющий поток может увеличивать семафор до входа в исполняемый файл, а другие потоки могут ожидать, пока он упадет до 0 или числа, которое отражает количество, разрешенное для параллельной работы.

Таймер:

http://download.oracle.com/javase/1.4.2/docs/api/java/util/TimerTask.html

Может использоваться для уменьшения семафора через 3 секунды ... Регулирование записи не более чем одному новому участнику каждые 3 секунды:

1 голос
/ 30 сентября 2011

Ограничение параллелизма на стороне клиента не является хорошим примером - как клиенты должны знать друг о друге?

0 голосов
/ 30 сентября 2011

В вашем классе контроллера, где вы вызываете работника для выполнения службы, используйте ExecutorService , чтобы запустить пул потоков для использования рабочего класса.

ExecutorService pool = Executors.newFixedThreadPool(10);
pool.submit(new MyServiceImpl(someObject));

Чтобы добавить к тому, что другиеуже предложили: «Выполнить выполнение рабочего класса из очереди рабочим классом и подождать столько минут, сколько вам требуется, прежде чем выполнять другую задачу из очереди».Я сделал это 2 минуты в качестве примера.

Пример:

public class MyServiceImpl implements MyService , Runnable {

  public static final int MAX_SIZE = 10;
  private final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(MAX_SIZE);

    @Override
    public void run() {
    try
    {
     Object obj;
      while ((obj==queue.take()) != null)
      {
       executeRequest(obj);
       //wait for 2 min
       Thread.sleep(1000 * 60 * 2);
      }
    }
    catch (InterruptedException e)
    {}
  }

  public void executeRequest(Object obj)
  {
    // do routine
  }  

  public MyServiceImpl (Object token)
  {
    try
    {
      queue.put(token);
    }
    catch (InterruptedException e)
    {
      throw new AssertionError(e);
    }
  }
}
0 голосов
/ 30 сентября 2011

Используя динамический прокси, вы можете обернуть свой сервис и обработать максимальное выполнение в InvocationHandler:

MyService proxy = (MyService) Proxy.newProxyInstance( //
    MyService.class.getClassLoader(), //
    new Class[] {MyService.class}, //
    new MaxInvocationHandler());

, где наивная реализация InvocationHandler может выглядеть примерно так:

  class MaxInvocationHandler implements InvocationHandler {
    private static final long MAX_INTERVAL = 1000L;
    private static final long MAX_INVOCATIONS = 1;

    AtomicLong time = new AtomicLong();
    AtomicLong counter = new AtomicLong();

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
      long currentTime = System.currentTimeMillis();
      if (time.get() < currentTime) {
        time.set(currentTime + MAX_INTERVAL);
        counter.set(1);
      } else if(counter.incrementAndGet() > MAX_INVOCATIONS) {
        throw new RuntimeException("Max invocation exceeded");
      }

      return method.invoke(proxy, args);
    }
  }
0 голосов
/ 30 сентября 2011

Задумывались ли вы об использовании sleep для остановки потоков перед тем, как перейти к удаленному вызову?Вы можете сделать так, чтобы потоки спали в течение случайного количества секунд между 1 и 5, что ограничивало бы количество потоков, обращающихся к методу в любое время.

Вы также можете установить lock на метод, срок действия которого истекает через 1 секунду, чтобы каждый поток «захватывает» блокировку, выполняет метод, но срок его блокировки истекает, поэтому следующий поток может его захватить.и выполнить.Установите блокировку в начале метода - и пусть он ничего не делает, кроме как удерживает поток в течение одной секунды thread.sleep(1000), а затем продолжает выполнение.Это ограничит вас одним потоком, попадающим в метод за раз.

РЕДАКТИРОВАТЬ: Ответ на комментарий ОП ниже

   class X {
   private final ReentrantLock lock = new ReentrantLock();
   // ...

   public void m() { 
     lock.lock();  // block until condition holds
     try {
       thread.sleep(1000) //added to example by floppydisk.
     } finally {
       lock.unlock()
       doYourConnectionHere();
     }
   }
 }

Изменено с: ReentrantLock .Внутри try / catch все, что вы делаете, это thread.sleep (1000) вместо того, чтобы что-то делать.Затем он снимает блокировку для следующего потока и продолжает выполнять оставшуюся часть тела метода - в вашем случае соединение с удаленным сервером.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...