500 рабочих потоков, что за пул потоков? - PullRequest
7 голосов
/ 19 мая 2010

Мне интересно, если это лучший способ сделать это. У меня около 500 потоков, которые работают бесконечно, но Thread.sleep на минуту, когда выполняется один цикл обработки.

   ExecutorService es = Executors.newFixedThreadPool(list.size()+1);
   for (int i = 0; i < list.size(); i++) {
      es.execute(coreAppVector.elementAt(i)); //coreAppVector is a vector of extends thread objects
   }

Код, который выполняется, действительно прост и в основном это

class aThread extends Thread {
   public void run(){
      while(true){
         Thread.sleep(ONE_MINUTE);
         //Lots of computation every minute
      }
   }
}

Мне нужны отдельные потоки для каждой выполняемой задачи, поэтому изменение архитектуры не вариант. Я попытался сделать мой размер ThreadPool равным Runtime.getRuntime (). AvailableProcessors (), который пытался запустить все 500 потоков, но только 8 (4xhyperthreading) из них выполнялись. Другие потоки не сдались бы и позволили другим потокам иметь свою очередь. Я попытался положить в wait () и notify (), но все равно не повезло. Если у кого-то есть простой пример или несколько советов, буду благодарен!

Ну, дизайн, возможно, имеет недостатки. Потоки реализуют Genetic-Programming или GP, тип алгоритма обучения. Каждый поток анализирует передовые тенденции, делает прогнозы. Если поток когда-либо завершается, обучение теряется. Тем не менее, я надеялся, что sleep () позволит мне поделиться некоторыми ресурсами, пока один поток не "изучает"

Таким образом, фактические требования

как я могу планировать задачи, которые поддерживают состояние и запуск каждые 2 минуты, но контролировать, сколько выполнить одновременно.

Ответы [ 11 ]

13 голосов
/ 19 мая 2010

Если ваши потоки не заканчиваются, то это ошибка кода внутри потока, а не пула потоков. Для получения более подробной помощи вам необходимо опубликовать код, который выполняется.

Кроме того, почему вы кладете каждый поток в спящий режим, когда он сделан; не лучше ли дать ему закончить?

Кроме того, я думаю, что вы неправильно используете пул потоков, поскольку количество потоков равно числу задач, которые вы хотите выполнить. Смысл пула потоков состоит в том, чтобы ограничить количество используемых ресурсов; этот подход ничуть не лучше, чем вообще не использовать пул потоков.

Наконец, вам не нужно передавать экземпляры Thread в ExecutorService, просто экземпляры Runnable. ExecutorService поддерживает свой собственный пул потоков, который зацикливается на неопределенный срок, отбирая работу из внутренней очереди (работа, которую отправляют Runnable).

10 голосов
/ 19 мая 2010

Почему бы не использовать от ScheduledExecutorService до расписания каждой задачи для запуска один раз в минуту, вместо того, чтобы все эти потоки простаивали в течение полной минуты?

ScheduledExecutorService workers = 
  Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
for (Runnable task : list) { 
  workers.scheduleWithFixedDelay(task, 0, 1, TimeUnit.MINUTES);
}

Что вы подразумеваете под «изменением архитектуры - это не вариант»? Если вы имеете в виду, что вы не можете изменить свою задачу вообще (в частности, задачи должны выполняться циклически, а не выполняться один раз, и вызов Thread.sleep() не может быть удален), тогда "хорошо производительность не вариант, "либо.

3 голосов
/ 19 мая 2010

Я не уверен, что ваш код семантически корректен в том, как он использует пул потоков. ExecutionService создает и управляет внутренними потоками, клиент должен просто предоставить экземпляр Runnable, метод run () которого будет выполняться в контексте одного из объединенных потоков Вы можете проверить мой пример . Также обратите внимание, что каждый работающий поток занимает ~ 10 МБ системной памяти для стека, а в linux отображение java-to-native потоков - 1: 1

2 голосов
/ 19 мая 2010

Чтобы ответить на ваш вопрос, какой тип пула потоков?

Я разместил свои комментарии, но это действительно должно решить вашу проблему. У вас есть вычисления, которые могут занять 2 секунды. У вас есть много заданий (500), которые вы хотите выполнить как можно быстрее. Максимально возможная пропускная способность, которую вы можете достичь при условии отсутствия ввода-вывода и / или сетевого трафика, достигается при Runtime.getRuntime().availableProcessors() количестве потоков.

Если вы увеличите свое число до 500 потоков, то каждая задача будет выполняться в своем собственном потоке, но ОС будет планировать поток так часто, чтобы передавать другому потоку. Это 125 переключение контекста в любой точке. Каждое переключение контекста увеличивает время выполнения каждой задачи.

Общая картина здесь в том, что добавление большего количества потоков НЕ равняется большей пропускной способности, когда вы превышаете количество процессоров.

Редактировать: быстрое обновление. Вам не нужно спать здесь. Когда вы выполняете 500 задач с 8 процессорами, каждая задача завершается за 2 секунды, завершается, и поток, на котором она выполнялась, затем выполняет следующую задачу и завершает ее.

2 голосов
/ 19 мая 2010

Вместо того, чтобы переводить сон в режим сна, вы должны позволить ему вернуться и использовать ThreadPoolexecutor для выполнения работы, публикуемой каждую минуту в вашей рабочей очереди.

1 голос
/ 20 мая 2010

Это должно делать то, что вы хотите, но не то, что вы просили :-) Вы должны вынуть Thread.sleep()

ScheduledRunnable.java

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledRunnable
{
    public static void main(final String[] args)
    {
        final int numTasks = 10;
        final ScheduledExecutorService ses = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());
        for (int i = 0; i < numTasks; i++)
        {
            ses.scheduleAtFixedRate(new MyRunnable(i), 0, 10, TimeUnit.SECONDS);
        }
    }

    private static class MyRunnable implements Runnable
    {
        private int id;
        private int numRuns;

        private MyRunnable(final int id)
        {
            this.id = id;
            this.numRuns = 0;
        }

        @Override
        public void run()
        {
            this.numRuns += 1;
            System.out.format("%d - %d\n", this.id, this.numRuns);
        }
    }
}

Это расписание Runnables каждые 10 секунд, чтобы показать поведение. Если вам действительно нужно подождать фиксированное количество времени ПОСЛЕ обработка завершена, вам, возможно, придется поэкспериментировать, какой метод .scheduleXXX вам нужен. Я думаю, что fixedWait будет запускать его каждый раз N независимо от того, какое время выполнения.

1 голос
/ 19 мая 2010

8 Потоки - это максимум, который может обрабатывать ваша система, и вы замедляете себя переключением контекста.

Посмотрите на эту статью http://www.informit.com/articles/article.aspx?p=1339471&seqNum=4 Это даст вам обзор того, как это сделать.

0 голосов
/ 20 мая 2010

Можете ли вы переписать свой проект для использования некоторой агентной среды параллелизма, такой как Akka ?

0 голосов
/ 19 мая 2010

Мне нужны отдельные потоки для каждой выполняемой задачи, поэтому изменение архитектуры не вариант.

Если верно (например, при вызове функции внешней блокировки), создайте для них отдельные потоки и запустите их. Вы не можете создать пул потоков с ограниченным числом потоков, так как блокирующая функция в одном из потоков предотвратит вставку в него любого другого запускаемого объекта и не сильно увеличит создание пула потоков с одним потоком на задачу.

Я попытался сделать размер моего ThreadPool равным Runtime.getRuntime (). AvailableProcessors (), который пытался запустить все 500 потоков, но только 8 (4xhyperthreading) из них выполнялись.

Когда вы передаете объекты Thread, которые вы создаете, в пул потоков, он видит только то, что они реализуют Runnable. Поэтому он будет запускаться каждый Runnable до завершения. Любой цикл, который останавливает возврат метода run(), не позволит запустить следующую поставленную в очередь задачу; например:

public static void main (String...args) {
    ExecutorService executor = Executors.newFixedThreadPool(2);

    for (int i = 0; i < 10; ++i) {
        final int task = i;

        executor.execute(new Runnable () {
        private long lastRunTime = 0;
            @Override
            public void run () {

                for (int iteration = 0; iteration < 4; )
                {
                    if (System.currentTimeMillis() - this.lastRunTime > TIME_OUT)
                    {
                        // do your work here
                        ++iteration;
                        System.out.printf("Task {%d} iteration {%d} thread {%s}.\n", task, iteration, Thread.currentThread());

                        this.lastRunTime = System.currentTimeMillis();
                    }
                    else
                    {
                        Thread.yield(); // otherwise, let other threads run
                    }
                }
            }
        });
    }

    executor.shutdown();
}

распечатывает:

Task {0} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {0} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
Task {2} iteration {1} thread {Thread[pool-1-thread-1,5,main]}.
Task {1} iteration {4} thread {Thread[pool-1-thread-2,5,main]}.
Task {3} iteration {1} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {2} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {2} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {3} thread {Thread[pool-1-thread-1,5,main]}.
Task {3} iteration {3} thread {Thread[pool-1-thread-2,5,main]}.
Task {2} iteration {4} thread {Thread[pool-1-thread-1,5,main]}.
...

показывает, что первые задачи (размер пула потоков) выполняются до того, как запланированы следующие задачи.

Что вам нужно сделать, это создать задачи, которые будут выполняться некоторое время, а затем запускать другие задачи. Как вы их структурируете, зависит от того, чего вы хотите достичь

  • хотите ли вы, чтобы все задачи выполнялись одновременно, все ждут минуту, затем все снова запускаются одновременно или же задачи не синхронизированы друг с другом
  • действительно ли вы хотели, чтобы каждая задача выполнялась с интервалом в одну минуту
  • независимо от того, блокируются ли ваши задачи или нет, и поэтому действительно требуют отдельных потоков
  • какое поведение ожидается, если задача блокируется дольше ожидаемого окна для запуска
  • какое поведение ожидается, если задача блокируется дольше, чем частота повторения (блокируется более одной минуты)

В зависимости от ответов на них для координации задач может использоваться некоторая комбинация ScheduledExecutorService, семафоров или мьютексов. Самый простой случай - неблокирующие, несинхронные задачи, в этом случае используйте ScheduledExecutorService напрямую для запуска ваших исполняемых файлов раз в минуту.

0 голосов
/ 19 мая 2010

Вы, безусловно, можете добиться некоторого улучшения пропускной способности, сократив количество потоков до уровня, который система реально может обрабатывать. Вы готовы немного изменить дизайн темы? Планировщик не будет загружать спящие в очередь вместо того, чтобы фактически иметь сотни спящих потоков.

class RepeatingWorker implements Runnable {

private ExecutorService executor;
private Date lastRan;

//constructor takes your executor

@Override
public void run() {

  try {
    if (now > lastRan + ONE_MINUTE) {
      //do job
      lastRan = now;
    } else {
      return;
  } finally {
    executor.submit(this);
  }
}
}

Это сохраняет вашу основную семантику «задание повторяется бесконечно, но ждет не менее одной минуты между выполнениями», но теперь вы можете настроить пул потоков на то, что может обрабатывать машина, а те, которые не работают, вместо этого находятся в очереди слоняться в планировщике как спящие темы. Если кто-то на самом деле ничего не делает, то можно ожидать, что он занят, но из вашего поста я предполагаю, что вся цель приложения состоит в том, чтобы запустить эти потоки, и в настоящее время они используют ваши процессоры. Возможно, вам придется настроиться на это, если вам нужно освободить место для других вещей:)

...