Карта вычислений: вычисление стоимости раньше времени - PullRequest
12 голосов
/ 08 июля 2010

У меня есть вычислительная карта мягкими значениями ), которую я использую для кэширования результатов дорогостоящих вычислений.

Теперь у меня есть ситуация, когда я знаю, что определенный ключ, вероятно, будет найден в течение следующих нескольких секунд. Этот ключ также дороже для вычисления, чем большинство.

Я хотел бы вычислить значение заранее в потоке с минимальным приоритетом, чтобы, когда значение в конечном итоге запрашивалось, оно уже было кэшировано, что улучшает время отклика.

Какой хороший способ сделать это так:

  1. У меня есть контроль над потоком (в частности, его приоритетом), в котором выполняется вычисление.
  2. Избегается дублирование работы, т. Е. Вычисление выполняется только один раз. Если задача вычисления уже выполняется, то вызывающий поток ждет этой задачи вместо того, чтобы снова вычислять значение (FutureTask реализует это. С картами вычисления Guava это верно, если вы только вызываете get, но не если вы смешиваете это с вызовами put.
  3. Метод «вычислить значение заранее» является асинхронным и идемпотентным. Если вычисление уже выполняется, оно должно немедленно вернуться, не дожидаясь завершения этого вычисления.
  4. Избегайте инверсии приоритетов, например, если поток с высоким приоритетом запрашивает значение, в то время как поток со средним приоритетом делает что-то не связанное, но задача вычисления ставится в очередь в потоке с низким приоритетом, поток с высоким приоритетом не должен истощаться. Возможно, этого можно достичь, временно повысив приоритет вычислительного потока (-ов) и / или запустив вычисления в вызывающем потоке.

Как это можно согласовать между всеми вовлеченными потоками?


Дополнительная информация
Вычисления в моем приложении являются операциями фильтрации изображений, что означает, что они все связаны с процессором. Эти операции включают в себя аффинные преобразования (в диапазоне от 50 мкс до 1 мс) и свертки (до 10 мсек). Конечно, эффективность варьирования приоритетов потоков зависит от способности ОС вытеснять более крупные задачи.

Ответы [ 4 ]

8 голосов
/ 13 июля 2010

Вы можете организовать «однократное» выполнение фоновых вычислений, используя Future с ComputedMap. Будущее представляет задачу, которая вычисляет ценность. Будущее создается ComputedMap и в то же время передается ExecutorService для фонового выполнения. Исполнитель может быть настроен с вашей собственной реализацией ThreadFactory , которая создает потоки с низким приоритетом, например

class LowPriorityThreadFactory implements ThreadFactory
{
   public Thread newThread(Runnable r) {
     Tread t = new Thread(r);
     t.setPriority(MIN_PRIORITY);
     return t;
   }
}

Когда значение необходимо, ваш высокоприоритетный поток затем извлекает будущее из карты и вызывает метод get () для получения результата, ожидая его вычисления в случае необходимости. Чтобы избежать инверсии приоритетов , добавьте в задачу дополнительный код:

class HandlePriorityInversionTask extends FutureTask<ResultType>
{
   Integer priority;  // non null if set
   Integer originalPriority;
   Thread thread;
   public ResultType get() {
      if (!isDone()) 
         setPriority(Thread.currentThread().getPriority());
      return super.get();
   }
   public void run() {
      synchronized (this) {
         thread = Thread.currentThread();
         originalPriority = thread.getPriority();
         if (priority!=null) setPriority(priority);
      } 
      super.run();
   }
   protected synchronized void done() {
         if (originalPriority!=null) setPriority(originalPriority);
         thread = null;
   }

   void synchronized setPriority(int priority) {
       this.priority = Integer.valueOf(priority);
       if (thread!=null)
          thread.setPriority(priority);
   }
}

Это заботится о повышении приоритета задачи до приоритета потока, вызывающего get(), если задача еще не выполнена, и возвращает приоритет оригиналу, когда задача завершается, обычно или иначе. (Короче говоря, код не проверяет, действительно ли приоритет выше, но его легко добавить.)

Когда высокоприоритетная задача вызывает get (), будущее еще не может начаться. Возможно, вы захотите избежать этого, установив большую верхнюю границу для числа потоков, используемых службой executor, но это может быть плохой идеей, поскольку каждый поток может работать с высоким приоритетом, потребляя столько процессоров, сколько он мог раньше ОС выключает это. Пул, вероятно, должен быть того же размера, что и количество аппаратных потоков, например Размер бассейна до Runtime.availableProcessors(). Если задача еще не начала выполняться, вместо того, чтобы ждать, пока исполнитель не запланирует ее (что является формой инверсии приоритетов, поскольку ваш поток с высоким приоритетом ожидает завершения потоков с низким приоритетом), вы можете отменить его с текущего исполнителя и повторно отправьте его на исполнителя, выполняющего только высокоприоритетные потоки.

2 голосов
/ 08 июля 2010

Я подозреваю, что вы идете по неверному пути, сосредоточившись на приоритетах потоков. Обычно данные, содержащиеся в кэше, являются дорогостоящими для вычисления из-за операций ввода-вывода (нехватка памяти) по сравнению с ограничением процессора (логическое вычисление). Если вы пытаетесь угадать будущие действия пользователя, такие как просмотр непрочитанных писем, то это означает, что ваша работа, скорее всего, связана с вводом / выводом. Это означает, что до тех пор, пока не произойдет истощение потоков (что не разрешено планировщиками), игра в игры с приоритетом потоков не принесет значительного улучшения производительности.

Если стоимость представляет собой вызов ввода-вывода, тогда фоновый поток блокируется, ожидая поступления данных, и обработка этих данных должна быть довольно дешевой (например, десериализация). Поскольку изменение приоритета потока не будет сильно ускоряться, выполнения асинхронной работы в фоновом пуле потоков должно быть достаточно. Если штраф за промах в кеше слишком высок, то использование нескольких уровней кэширования помогает еще больше снизить задержку, воспринимаемую пользователем.

2 голосов
/ 08 июля 2010

Один из распространенных способов координации ситуаций такого типа - иметь карту, значения которой являются объектами FutureTask. Таким образом, украдя в качестве примера некоторый код, который я написал с моего веб-сервера, основная идея состоит в том, что для данного параметра мы видим, существует ли уже FutureTask (имеется в виду, что расчет с этим параметром уже запланирован), и если так, то мы ждем этого. В этом примере мы иначе планируем поиск, но это можно сделать в другом месте с помощью отдельного вызова, если это желательно:

  private final ConcurrentMap<WordLookupJob, Future<CharSequence>> cache = ...

  private Future<CharSequence> getOrScheduleLookup(final WordLookupJob word) {
    Future<CharSequence> f = cache.get(word);
    if (f == null) {
      Callable<CharSequence> ex = new Callable<CharSequence>() {
        public CharSequence call() throws Exception {
          return doCalculation(word);
        }
      };
      Future<CharSequence> ft = executor.submit(ex);
      f = cache.putIfAbsent(word, ft);
      if (f != null) {
        // somebody slipped in with the same word -- cancel the
        // lookup we've just started and return the previous one
        ft.cancel(true);
      } else {
        f = ft;
      }
    }
    return f;
  }

С точки зрения приоритетов потоков: интересно, будет ли это достигать того, что, как вы думаете, будет? Я не совсем понимаю вашу мысль о повышении приоритета поиска над ожидающим потоком: если поток ожидает, то он ждет, независимо от относительных приоритетов других потоков ... (Вы можете посмотреть на некоторые статьи, которые я написал о приоритетах потоков и планировании потоков , но, если коротко, я не уверен, что изменение приоритета обязательно принесет вам то, что вы ожидаете .)

1 голос
/ 14 июля 2010

В качестве альтернативы приоритетам потоков вы можете выполнить задачу с низким приоритетом, только если не выполняются задачи с высоким приоритетом.Вот простой способ сделать это:

AtomicInteger highPriorityCount = new AtomicInteger();

void highPriorityTask() {
  highPriorityCount.incrementAndGet();
  try {
    highPriorityImpl();
  } finally {
    highPriorityCount.decrementAndGet();  
  }
}

void lowPriorityTask() {
  if (highPriorityCount.get() == 0) {
    lowPriorityImpl();
  }
}

В вашем случае оба метода Impl () вызовут get () на вычислительной карте, highPriorityImpl () в том же потоке и lowPriorityImpl () вдругой поток.

Вы можете написать более сложную версию, которая откладывает задачи с низким приоритетом до завершения задач с высоким приоритетом и ограничивает количество одновременных задач с низким приоритетом.

...