Thread-Pool с несколькими ограничениями - PullRequest
5 голосов
/ 03 февраля 2012

Мне нужен пул потоков, который предоставляет максимум X потоков для обработки задач, но пока проблем нет.Однако в каждой отправляемой задаче может быть задана целевая цель ввода-вывода (скажем, Y).

Таким образом, отправленная IOTask возвращает цель "google.com" с пределом 4 (Y), а пул имеет глобальный предел16 (Х).Я хочу отправить 10 google.com-задач, в которых параллельно обрабатываются только 4, а в пуле 12 свободных потоков для других задач.

Как мне этого добиться?

Ответы [ 6 ]

3 голосов
/ 03 февраля 2012

Реализация этой функциональности непроста, поскольку вам нужно либо иметь отдельные очереди для каждой цели (так что код ожидания становится намного более сложным), либо одну очередь, из которой вы затем пропускаете целевые объекты, которые загружены (что приводит к снижению производительности)).Вы можете попытаться расширить ExecutorService для достижения этой цели, но расширение кажется нетривиальным.

Обновленный ответ / решение:

Подумав немного об этом, самое простое решениепроблема блокировки состоит в том, чтобы иметь как очередь блокировки (как обычно), так и карту очередей (одна очередь на цель, а также количество доступных потоков на цель).Карта очередей используется только для задач, которые были переданы для выполнения (из-за слишком большого количества потоков, уже запущенных для этой цели) после того, как задача извлечена из обычной очереди блокировки.

Таким образом, поток выполнения будетвыглядит следующим образом:

  1. задание отправляется (с определенной целью) с помощью вызывающего кода.
  2. Задача помещается в очередь блокировки (вероятно, заключенная в нее вашим собственным классом задачи, который включает в себя цельинформация).
  3. поток (из пула потоков) ожидает в очереди блокировки (с помощью take ()).
  4. поток принимает отправленную задачу.
  5. поток синхронизируется поблокировка.
  6. поток проверяет доступное количество для этой цели.
  7. , если доступное количество> 0

    • , то поток уменьшает количество на 1, освобождаетблокировка, запускает задачу.
    • иначе поток помещает задачу в карту цели в очередь задач (эта карта представляет собой , переданную по карте задач ), снимает блокировку и возвращается к ожиданиюна тблокирующая очередь.
  8. когда поток завершает выполнение задачи, он:

    • синхронизируется при блокировке.
    • проверяетсчитать для цели, которую она только что выполнила.
    • если счет == 0
      • , то проверьте переданную карту задач для любых задач для этой цели, если она существует, затем снимите блокировку и запустите ее.
    • , если для счетчика не было 0 или для той же цели не было заданий в переданной карте / очереди, затем увеличьте доступное количество (для этой цели), снимите блокировку ивернитесь к ожиданию в очереди блокировки.

Это решение позволяет избежать значительного снижения производительности или использования отдельного потока только для управления очередью.

2 голосов
/ 28 апреля 2012

Вы можете заключить два экземпляра ExecutorService в пользовательский класс и вручную управлять отправкой задач следующим образом:

class ExecutorWrapper {

    private ExecutorService ioExec = Executors.newFixedThreadPool(4);
    private ExecutorService genExec = Executors.newFixedThreadPool(12);

    public Future<?> submit(final IOTask task) {
        return ioExec.submit(task);
    }

    public Future<?> submit(final Runnable task) {
        return genExec.submit(task);
    }
}

interface IOTask extends Runnable {}

Это позволяет вам использовать 4 потока для выполнения операций ввода-вывода и оставляет 12 других потоков для обслуживания других задач.

2 голосов
/ 05 февраля 2012

Обдумывая некоторые ответы более определенным образом.

  1. Вам потребуется собственный BlockingQueue, который может разделять различные типы задач и возвращать желаемый Runnable в зависимости от внутреннего счетчика.

  2. Расширить ThreadPoolExecutor и внедрить beforeExecute и afterExecute.

Когда вызывается beforeExecute, он увеличивает счетчик в очереди, если Runnable имеет тип X. Когда вызывается afterExecute, он уменьшает этот счетчик.

В вашей очереди вы бы затем вернули соответствующий Runnable в зависимости от значения счетчика, я полагаю, что метод take - это то место, где вы это сделаете.

Здесь есть некоторые проблемы с синхронизацией, которые должны быть полностью продуманы, чтобы гарантировать, что счетчик никогда не выйдет за пределы 4. К сожалению, как только вы находитесь в beforeExecute, уже слишком поздно, но вы можете просто знать, сколько из какой задачи выполняется на данное время может начать вас.

1 голос
/ 03 февраля 2012

Используйте счетчик для общего количества потоков и HashMap, который подсчитывает количество потоков, в данный момент пытающихся получить доступ к сайту X. Когда вы хотите начать новый поток, вызовите синхронизированный метод, который проверяет wait (wait () внутри цикла while) доколичество потоков в хэш-карте меньше 4, а общее количество потоков меньше 16. Затем увеличьте оба счетчика и запустите поток.Когда поток завершается, он должен вызвать второй синхронизированный метод, который уменьшает счетчики и вызывает notify ()

1 голос
/ 03 февраля 2012

Идея для этого может заключаться в расширении ExecutorService, и в вашем классе есть два ThreadPools, один с емкостью 4, а другой с емкостью 12.

Затем реализуйте необходимые вам методы и, основываясь на представленных IOT-задачах, вы сможете направить задачи, в какой пул вы хотели бы перейти.

1 голос
/ 03 февраля 2012

Хммм ... Боюсь, что выход ExecutorService не позволяет такой точный контроль зерна. Возможно, вам придется либо расширить класс ExecutorService, чтобы добавить эту функцию самостоятельно, либо использовать два отдельных фиксированных пула потоков, один с емкостью 4, другой с емкостью 12.

...