Использование ExecutorService с деревом задач для выполнения - PullRequest
3 голосов
/ 10 марта 2011

У нас была небольшая проблема. :)

Мы хотим обеспечить, чтобы только N потоков выполняли фоновые задачи в любое время. Для этого мы использовали фиксированный исполнитель пула потоков. Казалось, работает нормально.

Тогда мы нашли проблему. Предположим, у вас есть класс, который использует executor для выполнения некоторой параллельной работы, а затем вызывает другой класс, находясь в потоке executor, который также выполняет некоторую параллельную работу, намереваясь ждать его. Вот что происходит:

  • Основной поток вызывает метод первого уровня.
  • Этот метод думает, что он может распараллелить на 16 задач и разбивает свою работу.
  • 16 заданий передаются исполнителю.
  • Главный поток начинает ждать завершения своих задач.
  • Предположим, что доступно четыре потока, и первые четыре задачи выполняются. Таким образом, в очереди осталось 12 задач.
  • Теперь одна из этих задач вызывает другой метод.
  • Этот новый метод считает, что он может распараллеливаться на 2 задачи. Скажем, это первый шаг в параллельной сортировке слиянием или что-то в этом роде.
  • 2 задания передаются исполнителю.
  • Этот поток теперь начинает ждать завершения своих задач.

Ой-ой. Таким образом, на этом этапе все четыре потока теперь будут ожидать завершения задач, но они совместно блокируют исполнителя, фактически выполняющего эти задачи.

Решение 1 этой проблемы заключалось в следующем: при отправке нового задания исполнителю, если мы уже запускаем все наши потоки, и мы уже работаем в одном из потоков исполнителя, запустите задачу встроенным образом. Это работало нормально в течение 10 месяцев, но теперь мы столкнулись с проблемой. Если новые задачи, которые он отправляет, все еще относительно велики, то вы можете попасть в ситуацию, когда новая задача блокирует метод от добавления других задач в очередь, которые в противном случае могли бы быть подхвачены другими рабочими потоками. Таким образом, вы получаете периоды огромных задержек, пока поток обрабатывает работу в потоке.

Есть ли лучшее решение основной проблемы - выполнение потенциально неограниченного дерева фоновых задач? Я понимаю, что .NET-эквивалент службы executor обладает некой встроенной способностью красть из очереди, что предотвращает возникновение исходной тупиковой ситуации, что, насколько я могу судить, является идеальным решением. Но как насчет того, чтобы на земле Явы?

Ответы [ 3 ]

3 голосов
/ 10 марта 2011

Java 7 имеет концепцию ForkJoinPool, которая позволяет задаче «раскошелиться» на другую задачу, отправив ее одному и тому же исполнителю.Затем он дает возможность позднее попытаться «помочь присоединиться» к этой задаче, пытаясь выполнить ее, если она еще не была выполнена.

Я считаю, что то же самое можно сделать в Java 6, просто комбинируя Executor с FutureTask.Вот так:

public class Fib implements Callable<Integer> {
    int n;
    Executor exec;

    Fib(final int n, final Executor exec) {
        this.n = n;
        this.exec = exec;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Integer call() throws Exception {
        if (n == 0 || n == 1) {
            return n;
        }

        //Divide the problem
        final Fib n1 = new Fib(n - 1, exec);
        final Fib n2 = new Fib(n - 2, exec);

        //FutureTask only allows run to complete once
        final FutureTask<Integer> n2Task = new FutureTask<Integer>(n2);
        //Ask the Executor for help
        exec.execute(n2Task);

        //Do half the work ourselves
        final int partialResult = n1.call();

        //Do the other half of the work if the Executor hasn't
        n2Task.run();

        //Return the combined result
        return partialResult + n2Task.get();
    }

}        
1 голос
/ 10 марта 2011

Вы можете использовать обратные вызовы вместо того, чтобы ваш поток ожидал выполнения задач. Ваши задачи сами должны быть обратными вызовами, поскольку они отправляют больше задач.

например:.

public class ParallelTask implements Runnable, Callback {
  private final Callback mCB;
  private final int mNumChildTasks;
  private int mTimesCalledBack = 0;
  private final Object mLock = new Object();
  private boolean mCompleted = false;
  public ParallelTask(Callback cb) {
    mCB = cb;
    mNumChildTasks = N; // the number of direct child tasks you know this task will spawn
    // only going down 1 generation
    // of course you could figure this number out in the run method (will need to be volatile if so)
   // just as long as it is set before submitting any child tasks for execution
  }

  @Override
  public void run() {
    // do your stuff
    // and submit your child tasks, but don't wait on them to complete
    synchronized(mLock) {
      mCompleted = true;
      if (mNumChildTasks == mTimesCalledBack) {
        mCB.taskCompleted();
      }
    }
  }

  // Callback interface
  // taskCompleted is being called from the threads that this task's children are running in
  @Override
  public void taskCompleted() {
    synchronized(mLock) {
      mTimesCalledBack++;
      // only call our parent back if our direct children have all called us back
      // and our own task is done
      if (mCompleted && mTimesCalledBack == mNumChildTasks) {
        mCB.taskCompleted();
      }
    }
  }
}

В вашем основном потоке вы отправляете свою корневую задачу и регистрируете обратный вызов для выполнения.

Поскольку все дочерние задачи не сообщают о завершении, пока их дети не сообщили о завершении, ваш корневой обратный вызов не должен вызываться, пока все не будет сделано.

Я написал это на лету и не тестировал и не компилировал, так что могут быть некоторые ошибки.

0 голосов
/ 10 марта 2011

Похоже, проблема в том, что задачи также пытаются распараллелить себя, что затрудняет избегание ресурсных ограничений. Зачем тебе это нужно? Почему бы не всегда запускать подзадачи в строке?

Если вы полностью используете процессор уже путем распараллеливания, то вы не собираетесь много покупать с точки зрения общей работы, проделанной путем разделения работы на более мелкие задачи.

...