Обеспечение порядка выполнения задач в пуле потоков - PullRequest
48 голосов
/ 25 августа 2011

Я читал о шаблоне пула потоков, и я не могу найти обычное решение для следующей проблемы.

Иногда я хочу, чтобы задачи выполнялись последовательно. Например, я читаю куски текста из файла и по какой-то причине мне нужно обрабатывать куски в таком порядке. Поэтому я хочу исключить параллелизм для некоторых задач .

Рассмотрим этот сценарий, когда задачи с * необходимо обрабатывать в том порядке, в котором они были вставлены. Другие задачи можно обрабатывать в любом порядке.

push task1
push task2
push task3   *
push task4   *
push task5
push task6   *
....
and so on

В контексте пула потоков, без этого ограничения, одиночная очередь ожидающих задач работает нормально, но здесь явно нет.

Я думал о том, чтобы некоторые из потоков работали в определенной для потока очереди, а другие - в "глобальной" очереди. Затем, чтобы последовательно выполнить некоторые задачи, мне просто нужно поместить их в очередь, в которую смотрит один поток. Это делает звучит немного неуклюже.

Итак, настоящий вопрос в этой длинной истории: как бы вы решили это? Как бы вы обеспечили заказ этих задач ?

EDIT

В качестве более общей проблемы предположим, что приведенный выше сценарий становится

push task1
push task2   **
push task3   *
push task4   *
push task5
push task6   *
push task7   **
push task8   *
push task9
....
and so on

Я имею в виду, что задачи внутри группы должны выполняться последовательно, но сами группы могут смешиваться. Например, вы можете иметь 3-2-5-4-7.

Еще одна вещь, на которую следует обратить внимание, - это то, что у меня нет доступа ко всем задачам в группе заранее (и я не могу дождаться, пока все они придут, прежде чем начать группу).

Спасибо за ваше время.

Ответы [ 17 ]

17 голосов
/ 01 сентября 2011

Что-то вроде следующего позволит поставить в очередь последовательные и параллельные задачи, где последовательные задачи будут выполняться одна за другой, а параллельные задачи будут выполняться в любом порядке, но параллельно.Это дает вам возможность при необходимости сериализовать задачи, также иметь параллельные задачи, но делать это по мере получения задач, т. Е. Вам не нужно знать обо всей последовательности заранее, порядок выполнения поддерживается динамически.

internal class TaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Queue<QTask> _tasks = new Queue<QTask>();
    private int _runningTaskCount;

    public void Queue(bool isParallel, Action task)
    {
        lock (_syncObj)
        {
            _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
        }

        ProcessTaskQueue();
    }

    public int Count
    {
        get{lock (_syncObj){return _tasks.Count;}}
    }

    private void ProcessTaskQueue()
    {
        lock (_syncObj)
        {
            if (_runningTaskCount != 0) return;

            while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
            {
                QTask parallelTask = _tasks.Dequeue();

                QueueUserWorkItem(parallelTask);
            }

            if (_tasks.Count > 0 && _runningTaskCount == 0)
            {
                QTask serialTask = _tasks.Dequeue();

                QueueUserWorkItem(serialTask);
            }
        }
    }

    private void QueueUserWorkItem(QTask qTask)
    {
        Action completionTask = () =>
        {
            qTask.Task();

            OnTaskCompleted();
        };

        _runningTaskCount++;

        ThreadPool.QueueUserWorkItem(_ => completionTask());
    }

    private void OnTaskCompleted()
    {
        lock (_syncObj)
        {
            if (--_runningTaskCount == 0)
            {
                ProcessTaskQueue();
            }
        }
    }

    private class QTask
    {
        public Action Task { get; set; }
        public bool IsParallel { get; set; }
    }
}

Обновление

Для обработки групп задач с последовательным и параллельным смешением задач, GroupedTaskQueue может управлять TaskQueue для каждой группы.Опять же, вам не нужно заранее знать о группах, все это динамически управляется по мере получения задач.

internal class GroupedTaskQueue
{
    private readonly object _syncObj = new object();
    private readonly Dictionary<string, TaskQueue> _queues = new Dictionary<string, TaskQueue>();
    private readonly string _defaultGroup = Guid.NewGuid().ToString();

    public void Queue(bool isParallel, Action task)
    {
        Queue(_defaultGroup, isParallel, task);
    }

    public void Queue(string group, bool isParallel, Action task)
    {
        TaskQueue queue;

        lock (_syncObj)
        {
            if (!_queues.TryGetValue(group, out queue))
            {
                queue = new TaskQueue();

                _queues.Add(group, queue);
            }
        }

        Action completionTask = () =>
        {
            task();

            OnTaskCompleted(group, queue);
        };

        queue.Queue(isParallel, completionTask);
    }

    private void OnTaskCompleted(string group, TaskQueue queue)
    {
        lock (_syncObj)
        {
            if (queue.Count == 0)
            {
                _queues.Remove(group);
            }
        }
    }
}
14 голосов
/ 25 августа 2011

Пулы потоков хороши для случаев, когда относительный порядок задач не имеет значения, если все они выполнены. В частности, должно быть нормально, чтобы все они выполнялись параллельно.

Если ваши задачи должны выполняться в определенном порядке, то они не подходят для параллелизма, поэтому пул потоков не подходит.

Если вы хотите переместить эти последовательные задачи из основного потока, то для этих задач подойдет один фоновый поток с очередью задач. Вы можете продолжать использовать пул потоков для оставшихся задач, которые подходят для параллелизма.

Да, это означает, что вы должны решить, куда отправить задачу, в зависимости от того, является ли она задачей в порядке или «может быть распараллелена», но это не имеет большого значения.

Если у вас есть группы, которые должны быть сериализованы, но которые могут работать параллельно с другими задачами, у вас есть несколько вариантов:

  1. Создайте отдельную задачу для каждой группы, которая выполняет соответствующие групповые задачи по порядку, и опубликуйте эту задачу в пуле потоков.
  2. Пусть каждая задача в группе явно ожидает предыдущую задачу в группе и публикует их в пуле потоков. Это требует, чтобы ваш пул потоков мог обрабатывать случай, когда поток ожидает еще не запланированную задачу без взаимоблокировки.
  3. Иметь отдельную ветку для каждой группы и публиковать групповые задачи в соответствующей очереди сообщений.
8 голосов
/ 02 сентября 2011

В основном, есть ряд незавершенных задач.Некоторые из задач могут быть выполнены только после завершения выполнения одной или нескольких ожидающих задач.

Ожидающие задачи могут быть смоделированы в графе зависимостей:

  • "задача 1 ->задача 2 означает, что задача 2 может быть выполнена только после завершения задачи 1.стрелки указывают в направлении порядка выполнения.
  • степень задачи (количество задач, указывающих на нее) определяет, готова ли задача к выполнению.Если степень равна 0, она может быть выполнена.
  • иногда задача должна ждать завершения нескольких задач, тогда степень равна> 1.
  • , если задача не должна ждатьчтобы другие задачи завершались (его степень не равна нулю), его можно отправить в пул потоков с рабочими потоками или в очередь с задачами, ожидающими обработки рабочим потоком.Вы знаете, что отправленное задание не приведет к взаимоблокировке, потому что задание ничего не ждет.В качестве оптимизации вы можете использовать приоритетную очередь, например, в которой задачи, от которых зависит больше задач в графе зависимостей, будут выполняться первыми.Это также не может спровоцировать тупик, потому что все задачи в пуле потоков могут быть выполнены.Однако это может спровоцировать голодание.
  • Если задача завершает выполнение, ее можно удалить из графа зависимостей, что может уменьшить степень других задач, которые, в свою очередь, могут быть переданы в пул рабочих потоков.

Таким образом, существует (по крайней мере) один поток, используемый для добавления / удаления отложенных задач, и существует пул потоков рабочих потоков.

Когда задача добавляется в граф зависимостей, вы должны проверить:

  • как задача связана в графе зависимостей: какие задачи должны ждать, чтобы завершиться, и какие задачи должны ждать, пока она завершится?Нарисуйте соединения от и до новой задачи соответственно.
  • после того, как соединения нарисованы: новые соединения вызвали какие-либо циклы в графе зависимостей?В этом случае возникает тупиковая ситуация.

Производительность :

  • Этот шаблон медленнее, чем последовательное выполнение, если параллельное выполнение на самом деле редко возможнопотому что вам все равно нужно дополнительное администрирование, чтобы делать все почти последовательно.
  • этот шаблон работает быстро, если на практике можно одновременно выполнять много задач.

Допущения :

Поскольку вы, возможно, читали между строк, вы должны проектировать задачи так, чтобы они не мешали другим задачам.Также должен быть способ определения приоритета задач.Приоритет задачи должен включать данные, обрабатываемые каждой задачей.Две задачи не могут изменять один и тот же объект одновременно;вместо этого одна из задач должна иметь приоритет над другой, иначе выполняемые операции над объектом должны быть поточно-ориентированными.

6 голосов
/ 31 августа 2011

Чтобы сделать то, что вы хотите сделать с пулом потоков, вам может потребоваться создать какой-то планировщик.

Примерно так:

TaskQueue -> Планировщик -> Очередь -> ThreadPool

Планировщик работает в своем собственном потоке, отслеживая зависимости между заданиями. Когда задание готово к выполнению, планировщик просто помещает его в очередь для пула потоков.

ThreadPool, возможно, придется отправлять сигналы планировщику, чтобы указать, когда задание выполнено, чтобы планировщик мог помещать задания в зависимости от этого задания в очередь.

В вашем случае зависимости могут быть сохранены в связанном списке.

Допустим, у вас есть следующие зависимости: 3 -> 4 -> 6 -> 8

Задание 3 выполняется в пуле потоков, вы все еще не знаете, существует ли задание 8.

Работа 3 заканчивается. Вы удаляете 3 из связанного списка, вы помещаете задание 4 в очередь в пул потоков.

Работа 8 приходит. Вы помещаете это в конец связанного списка.

Единственными конструкциями, которые должны быть полностью синхронизированы, являются очереди до и после планировщика.

4 голосов
/ 25 августа 2011

Если я правильно понимаю проблему, исполнители jdk не имеют этой возможности, но вы легко можете сделать свою собственную. Вам в основном нужно

  • пул рабочих потоков, каждый из которых имеет выделенную очередь
  • некоторая абстракция над теми очередями, которым вы предлагаете работу (см. ExecutorService)
  • некоторый алгоритм, который детерминистически выбирает определенную очередь для каждой части работы
  • каждая часть работы затем получает предложения в нужную очередь и, следовательно, обрабатывается в правильном порядке

Разница с исполнителями jdk заключается в том, что у них есть 1 очередь с n потоками, но вы хотите, чтобы n очередей и m потоков (где n может или не может равняться m)

* редактировать после прочтения, что каждое задание имеет ключ *

Более подробно

  • написать некоторый код, который преобразует ключ в индекс (целое число) в заданном диапазоне (0-n, где n - это количество потоков, которое вы хотите), это может быть просто key.hashCode() % n или это может быть статическое отображение известных значений ключей в потоки или что угодно
  • при запуске
    • создать n очередей, поместить их в индексированную структуру (массив, список, что угодно)
    • запускает n потоков, каждый поток просто делает блокировку из очереди
    • когда он получает какую-то работу, он знает, как выполнить работу, специфичную для этой задачи / события (очевидно, вы можете иметь некоторое отображение задач в действия, если у вас есть разнородные события)
  • хранить это за фасадом, который принимает рабочие элементы
  • когда придет задание, передайте его на фасад
    • фасад находит правильную очередь для задачи на основе ключа, предлагает ее в эту очередь

достаточно просто добавить автоматический перезапуск рабочих потоков в эту схему, вам просто нужно, чтобы рабочий поток зарегистрировался у какого-то менеджера, чтобы заявить «Я владею этой очередью», а затем поработать над этим + обнаружение ошибок в потоке ( это означает, что он отменяет регистрацию владельца этой очереди, возвращая очередь в свободный пул очередей, что является триггером для запуска нового потока)

4 голосов
/ 02 сентября 2011

Я думаю, что пул потоков может быть эффективно использован в этой ситуации.Идея состоит в том, чтобы использовать отдельный объект strand для каждой группы зависимых задач.Вы добавляете задачи в свою очередь с помощью объекта или без strand.Вы используете один и тот же объект strand с зависимыми задачами.Ваш планировщик проверяет, имеет ли следующая задача strand и заблокирована ли эта strand.Если нет - заблокируйте strand и запустите эту задачу.Если strand уже заблокирован - удерживайте эту задачу в очереди до следующего события планирования.Когда задание выполнено, разблокируйте его strand.

В результате вам понадобится одна очередь, вам не нужны никакие дополнительные потоки, сложные группы и т. Д. strand Объект может быть очень простым с помощью двух методов lock и unlock.

Я часто сталкиваюсь с одной и той же проблемой проектирования, например, для асинхронного сетевого сервера, который обрабатывает несколько одновременных сеансов.Сеансы независимы (это сопоставляет их с вашими независимыми задачами и группами зависимых задач), когда задачи внутри сессий являются зависимыми (это сопоставляет внутренние задачи сеанса с вашими зависимыми задачами внутри группы).Используя описанный подход, я полностью избегаю явной синхронизации внутри сессии.Каждый сеанс имеет собственный strand объект.

И более того, я использую существующую (отличную) реализацию этой идеи: Boost Asio library (C ++).Я просто использовал их термин strand.Реализация элегантна: я обертываю моих асинхронных задач в соответствующий strand объект до их планирования.

3 голосов
/ 30 августа 2011

Ответы, предлагающие не использовать пул потоков, подобны жесткому программированию знания зависимостей задач / порядка выполнения. Вместо этого я бы создал CompositeTask, который управляет зависимостью начала / конца между двумя задачами. Инкапсулируя зависимость в интерфейсе задач, все задачи можно обрабатывать единообразно и добавлять в пул. Это скрывает детали выполнения и позволяет изменять зависимости задачи, не влияя на то, используете ли вы пул потоков или нет.

В вопросе не указан язык - я буду использовать Java, который, я надеюсь, будет удобочитаемым для большинства.

class CompositeTask implements Task
{
    Task firstTask;
    Task secondTask;

    public void run() {
         firstTask.run();
         secondTask.run();
    }
}

Выполняет задачи последовательно и в одном потоке. Вы можете объединить в цепочку множество CompositeTask, чтобы создать последовательность из стольких последовательных задач, сколько необходимо.

Недостатком здесь является то, что это связывает поток на время всех задач, выполняемых последовательно. У вас могут быть другие задачи, которые вы предпочитаете выполнять между первой и второй задачами. Поэтому вместо непосредственного выполнения второй задачи составьте график выполнения составной задачи для второй задачи:

class CompositeTask implements Runnable
{
    Task firstTask;
    Task secondTask;
    ExecutorService executor;

    public void run() {
         firstTask.run();
         executor.submit(secondTask);
    }
}

Это гарантирует, что вторая задача не будет запущена до тех пор, пока первая задача не будет завершена, а также позволяет пулу выполнять другие (возможно, более срочные) задачи. Обратите внимание, что первая и вторая задачи могут выполняться в отдельных потоках, поэтому, хотя они и не выполняются одновременно, любые общие данные, используемые задачами, должны быть видны другим потокам (например, путем создания переменных volatile.)

Это простой, но мощный и гибкий подход, позволяющий самим задачам определять ограничения выполнения, а не делать это с помощью различных пулов потоков.

3 голосов
/ 26 августа 2011

Вариант 1 - Комплексный

Поскольку у вас есть последовательные задания, вы можете собрать эти задания в цепочку и позволить самим заданиям повторно отправлять их в пул потоков после их завершения. Предположим, у нас есть список рабочих мест:

 [Task1, ..., Task6]

как в вашем примере. У нас есть последовательная зависимость, так что [Task3, Task4, Task6] является цепочкой зависимостей. Теперь мы делаем работу (псевдокод Erlang):

 Task4Job = fun() ->
               Task4(), % Exec the Task4 job
               push_job(Task6Job)
            end.
 Task3Job = fun() ->
               Task3(), % Execute the Task3 Job
               push_job(Task4Job)
            end.
 push_job(Task3Job).

То есть мы изменяем задание Task3, превращая его в задание, которое в качестве продолжения переносит следующее задание в очереди в пул потоков. Существуют сильные сходства с общим стилем передачи продолжения , который также наблюдается в таких системах, как Node.js или Pythons Twisted framework.

Обобщая, вы создаете систему, в которой вы можете определить цепочки заданий, которые могут defer продолжить работу и повторно передать дальнейшую работу.

Вариант 2 - Простой

Почему мы вообще пытаемся разделить работу? Я имею в виду, поскольку они зависят друг от друга, выполнение всех из них в одном и том же потоке не будет быстрее или медленнее, чем выполнение этой цепочки и ее распределение по нескольким потокам. Предполагая «достаточную» рабочую нагрузку, любой поток всегда будет работать в любом случае, так что объединить задания вместе, вероятно, проще всего:

  Task = fun() ->
            Task3(),
            Task4(), 
            Task6()  % Just build a new job, executing them in the order desired
         end,
  push_job(Task).

Довольно просто делать такие вещи, если у вас есть функции от имени граждан первого класса, поэтому вы можете строить их на своем языке по своему усмотрению, как, например, в любом функциональном языке программирования, Python, Ruby-блоках - и так далее.

Мне не особо нравится идея построения очереди или стека продолжения, как в «Варианте 1», хотя я бы определенно выбрал второй вариант. В Erlang у нас даже есть программы под названием jobs, написанные Erlang Solutions и выпущенные как Open Source. jobs создан для выполнения и загрузки, например, для выполнения заданий. Вероятно, я бы совместил вариант 2 с заданиями, если бы решил эту проблему.

3 голосов
/ 01 сентября 2011

Используйте два активных объекта . В двух словах: шаблон активного объекта состоит из приоритетной очереди и одного или нескольких рабочих потоков, которые могут получать задачи из очереди и обрабатывать их.

Так что используйте один активный объект с одним рабочим потоком: все задачи, которые будут помещены в очередь, будут обрабатываться последовательно. Использовать второй активный объект с числом рабочих потоков больше 1. В этом случае рабочие потоки будут получать и обрабатывать задачи из очереди в любом порядке.

Удача.

2 голосов
/ 25 августа 2011

Я думаю, вы смешиваете понятия. Threadpool - это нормально, если вы хотите распределить некоторую работу между потоками, но если вы начнете смешивать зависимости между потоками, это не очень хорошая идея.

Мой совет, просто не используйте пул потоков для этих задач. Просто создайте выделенный поток и сохраняйте простую очередь из последовательных элементов, которые должны обрабатываться этим потоком в одиночку. Затем вы можете продолжать отправлять задачи в пул потоков, если у вас нет последовательных требований, и использовать выделенный поток, когда у вас есть.

Пояснение: используя здравый смысл, очередь последовательных задач должна выполняться одним потоком, обрабатывающим каждую задачу одну за другой:)

...