Какой ThreadPool в Java я должен использовать? - PullRequest
8 голосов
/ 15 июля 2010

Есть огромное количество заданий. Каждое задание относится к одной группе. Требование заключается в том, что каждая группа задач должна выполняться последовательно, как и в одном потоке, а пропускная способность должна быть максимальной в среде с несколькими ядрами (или несколькими процессорами). Примечание: существует также огромное количество групп, которое пропорционально количеству задач.

Наивное решение использует ThreadPoolExecutor и синхронизировать (или заблокировать). Однако потоки будут блокировать друг друга, и пропускная способность не будет максимальной.

Есть идея получше? Или существует сторонняя библиотека, удовлетворяющая требованию?

Ответы [ 5 ]

3 голосов
/ 15 июля 2010

Простой подход состоит в том, чтобы «объединить» все групповые задачи в одну суперзадачу, таким образом заставляя подзадачи выполняться последовательно. Но это, вероятно, вызовет задержку в других группах, которая не запустится, если какая-то другая группа полностью не завершит работу и не освободит место в пуле потоков.

В качестве альтернативы рассмотрите возможность объединения задач группы. Следующий код иллюстрирует это:

public class MultiSerialExecutor {
    private final ExecutorService executor;

    public MultiSerialExecutor(int maxNumThreads) {
        executor = Executors.newFixedThreadPool(maxNumThreads);
    }

    public void addTaskSequence(List<Runnable> tasks) {
        executor.execute(new TaskChain(tasks));
    }

    private void shutdown() {
        executor.shutdown();
    }

    private class TaskChain implements Runnable {
        private List<Runnable> seq;
        private int ind;

        public TaskChain(List<Runnable> seq) {
            this.seq = seq;
        }

        @Override
        public void run() {
            seq.get(ind++).run(); //NOTE: No special error handling
            if (ind < seq.size())
                executor.execute(this);
        }       
    }

Преимущество заключается в том, что дополнительный ресурс (поток / очередь) не используется, а детализация задач лучше, чем в наивном подходе. Недостатком является то, что все задачи группы должны быть известны заранее .

- изменить -

Чтобы сделать это решение универсальным и полным, вам может потребоваться принять решение об обработке ошибок (т. Е. Будет ли цепочка продолжаться даже в случае возникновения ошибки), а также было бы неплохо реализовать ExecutorService и делегировать все вызовы основной исполнитель.

2 голосов
/ 15 июля 2010

Я бы предложил использовать очереди задач:

  • Для каждой группы задач Вы создали очередь и вставили в нее все задачи из этой группы.
  • Теперь все ваши очередиможет выполняться параллельно, в то время как задачи внутри одной очереди выполняются последовательно.

Быстрый поиск в Google показывает, что в Java API нет никаких очередей задач / потоков.Однако есть много учебных пособий по кодированию один.Каждый может свободно перечислить хорошие учебники / реализации, если Вы знаете некоторые из них:

1 голос
/ 15 июля 2010

Я в основном согласен с ответом Дэйва, но если вам нужно распределить процессорное время по всем «группам», то есть все группы задач должны развиваться параллельно, вы можете найти этот тип конструкции полезным (используя удаление в качестве «блокировки». работал нормально в моем случае, хотя я думаю, что это имеет тенденцию использовать больше памяти):

class TaskAllocator {
    private final ConcurrentLinkedQueue<Queue<Runnable>> entireWork
         = childQueuePerTaskGroup();

    public Queue<Runnable> lockTaskGroup(){
        return entireWork.poll();
    }

    public void release(Queue<Runnable> taskGroup){
        entireWork.offer(taskGroup);
    }
 }

и

 class DoWork implmements Runnable {
     private final TaskAllocator allocator;

     public DoWork(TaskAllocator allocator){
         this.allocator = allocator;
     }

     pubic void run(){
        for(;;){
            Queue<Runnable> taskGroup = allocator.lockTaskGroup();
            if(task==null){
                //No more work
                return;
            }
            Runnable work = taskGroup.poll();
            if(work == null){
                //This group is done
                continue;
            }

            //Do work, but never forget to release the group to 
            // the allocator.
            try {
                work.run();
            } finally {
                allocator.release(taskGroup);
            }
        }//for
     }
 }

Затем вы можете использовать оптимальное количество потоков для запуска задачи DoWork. Это своего рода круговой баланс нагрузки.

Вы даже можете сделать что-то более сложное, используя это вместо простой очереди в TaskAllocator (группы задач с большим количеством оставшихся задач, как правило, выполняются)

ConcurrentSkipListSet<MyQueue<Runnable>> sophisticatedQueue = 
    new ConcurrentSkipListSet(new SophisticatedComparator());

, где SophisticatedComparator равно

class SophisticatedComparator implements Comparator<MyQueue<Runnable>> {
    public int compare(MyQueue<Runnable> o1, MyQueue<Runnable> o2){
        int diff = o2.size() - o1.size();
        if(diff==0){
             //This is crucial. You must assign unique ids to your 
             //Subqueue and break the equality if they happen to have same size.
             //Otherwise your queues will disappear...
             return o1.id - o2.id;
        }
        return diff;
    }
 }
0 голосов
/ 28 октября 2014

У меня была проблема, похожая на вашу, и я использовал ExecutorCompletionService, который работает с Executor для выполнения наборов задач. Вот выдержка из API java.util.concurrent, начиная с Java7:

Предположим, у вас есть набор решателей для определенной проблемы, каждый из которых возвращает значение некоторого типа Result, и хотел бы запустить их одновременно, обрабатывая результаты каждого из них, которые возвращают ненулевое значение, в некотором методе использовать (Результат г). Вы могли бы написать это как:

void solve(Executor e, Collection<Callable<Result>> solvers)
        throws InterruptedException, ExecutionException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    for (Callable<Result> s : solvers)
        ecs.submit(s);
    int n = solvers.size();
    for (int i = 0; i < n; ++i) {
        Result r = ecs.take().get();
        if (r != null)
            use(r);
    }
}

Таким образом, в вашем сценарии каждая задача будет представлять собой один Callable<Result>, а задачи будут сгруппированы в Collection<Callable<Result>>.

Справка: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html

0 голосов
/ 14 октября 2010

Актер также является другим решением для указанного типа проблем.У Scala есть актеры, а также Java, предоставленные AKKA.

...