Самый легкий примитив синхронизации для очереди рабочих потоков - PullRequest
7 голосов
/ 28 сентября 2010

Я собираюсь реализовать рабочий поток с очередями рабочих элементов, и пока я размышлял над проблемой, я хотел знать, справляюсь ли я лучше всего.

В обсуждаемом потокеиметь некоторые локальные данные потока (предварительно инициализированные при построении) и зацикливаться на рабочих элементах до тех пор, пока не будет выполнено некоторое условие.

псевдокод:

volatile bool run = true;

int WorkerThread(param)
{
    localclassinstance c1 = new c1();
    [other initialization]

    while(true) {
        [LOCK]
        [unqueue work item]
        [UNLOCK]
        if([hasWorkItem]) {
            [process data]
            [PostMessage with pointer to data]
        }
        [Sleep]

        if(!run)
            break;
    }

    [uninitialize]
    return 0;
}

Я предполагаю, что сделаю блокировку с помощьюраздел, так как очередь будет std :: vector или std :: queue, но, возможно, есть лучший способ.

Часть со сном выглядит не слишком хорошо, так как будет много лишнихSleep с большими значениями Sleep или множеством дополнительных блокировок, когда значение Sleep мало, и это определенно не нужно.

Но я не могу представить себе дружественный примитив WaitForSingleObject, который я мог бы использовать вместо критической секции, посколькубыть двумя потоками в очереди рабочих элементов одновременно.Таким образом, Event, который кажется наилучшим кандидатом, может потерять второй рабочий элемент, если он уже установлен, и это не гарантирует взаимное исключение.

Возможно, есть даже лучший подход с видом InterlockedExchange.функций, что приводит к еще меньшей сериализации.

PS: мне может понадобиться предварительно обработать всю очередь и отбросить устаревшие рабочие элементы на этапе отмены обработки.

Ответы [ 9 ]

5 голосов
/ 28 сентября 2010

Существует множество способов сделать это.

Один из вариантов - использовать семафор для ожидания.Семафор сигнализируется каждый раз, когда значение помещается в очередь, поэтому рабочий поток будет блокироваться, только если в очереди нет элементов.Для этого по-прежнему потребуется отдельная синхронизация в самой очереди.

Второй вариант - использовать событие ручного сброса, которое устанавливается, когда в очереди есть элементы, и очищается, когда очередь пуста.Опять же, вам нужно будет выполнить отдельную синхронизацию в очереди.

Третий вариант - создать в потоке невидимое окно только для сообщений и использовать специальное сообщение WM_USER или WM_APP для публикации.элементы в очередь, прикрепляя элемент к сообщению через указатель.

Другой вариант - использовать условные переменные .Собственные переменные условия Windows работают, только если вы нацелены на Windows Vista или Windows 7, но переменные условия также доступны для Windows XP с Boost или реализацией библиотеки потоков C ++ 0x.Пример очереди с использованием переменных условия повышения доступен в моем блоге: http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html

3 голосов
/ 28 сентября 2010

Можно разделить ресурс между потоками, вообще не используя блокирующие блокировки, если ваш сценарий соответствует определенным требованиям.

Вам нужен примитив для обмена атомарными указателями, например, Win32 InterlockedExchange . Большинство процессорных архитектур обеспечивают некоторый атомарный обмен, и обычно он намного дешевле, чем получение формальной блокировки.

Вы можете хранить свою очередь рабочих элементов в переменной-указателе, которая доступна для всех потоков, которые будут в ней заинтересованы. (глобальная переменная или поле объекта, к которому имеют доступ все потоки)

В этом сценарии предполагается, что участвующим потокам всегда есть чем заняться, и лишь изредка «заглядывают» на общий ресурс. Если вам нужен дизайн, в котором потоки блокируют ожидание ввода, используйте традиционный объект события блокировки.

Прежде чем что-либо начнется, создайте свой объект списка очередей или рабочих элементов и назначьте его переменной общего указателя.

Теперь, когда производители хотят вставить что-то в очередь, они «получают» эксклюзивный доступ к объекту очереди, обменивая значение NULL на переменную общего указателя с помощью InterlockedExchange. Если результат обмена возвращает ноль, то кто-то еще в настоящее время модифицирует объект очереди. Sleep (0), чтобы освободить оставшуюся часть времени вашего потока, затем цикл, чтобы повторить своп, пока он не вернет ненулевое значение. Даже если вы закончите цикл несколько раз, это много. во много раз быстрее, чем вызов ядра для получения объекта мьютекса. Для вызова ядра требуются сотни тактов для перехода в режим ядра.

Когда вы успешно получите указатель, внесите изменения в очередь, а затем поменяйте указатель очереди обратно на общий указатель.

Когда вы потребляете элементы из очереди, вы делаете то же самое: поменяйте местами нулевое значение в общем указателе и зацикливайтесь, пока не получите ненулевой результат, оперируйте объектом в локальной переменной, а затем поменяйте его обратно в общую указатель вар.

Эта техника представляет собой комбинацию атомного обмена и коротких циклов вращения. Это хорошо работает в сценариях, где вовлеченные потоки не заблокированы, и столкновения редки. Большую часть времени подкачка будет давать вам эксклюзивный доступ к общему объекту с первой попытки, и, поскольку длительность времени, в течение которого объект очереди удерживается исключительно каким-либо потоком, очень мала, ни один поток не должен будет зацикливаться больше, чем несколько раз, прежде чем объект очереди снова станет доступным.

Если вы ожидаете много конфликтов между потоками в своем сценарии или хотите, чтобы потоки блокировали большую часть своего времени в ожидании выполнения работы, вам может быть лучше обслуживать формальный объект синхронизации мьютекса.

2 голосов
/ 28 сентября 2010

Самым быстрым блокирующим примитивом обычно является спин-блокировка или спин-блокировка сна.CRITICAL_SECTION - это как раз такая (пользовательское пространство) спин-сон-блокировка.(Ну, конечно, если не использовать блокирующие примитивы вообще. Но это означает использование структур данных без блокировок, и это действительно очень трудно понять правильно.)

Что касается избежания Sleep: имейтепосмотрите на условные переменные.Они предназначены для использования вместе с «мьютексом», и я думаю, что их гораздо проще использовать правильно, чем СОБЫТИЯ Windows.

Boost.Thread имеет хорошую переносимую реализацию обоих, быстрых пользовательскихпробелы блокировки спина и переменные условия:

http://www.boost.org/doc/libs/1_44_0/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref

Рабочая очередь с использованием Boost.Thread может выглядеть примерно так:

template <class T>
class Queue : private boost::noncopyable
{
public:
    void Enqueue(T const& t)
    {
        unique_lock lock(m_mutex);

        // wait until the queue is not full
        while (m_backingStore.size() >= m_maxSize)
            m_queueNotFullCondition.wait(lock); // releases the lock temporarily

        m_backingStore.push_back(t);
        m_queueNotEmptyCondition.notify_all(); // notify waiters that the queue is not empty
    }

    T DequeueOrBlock()
    {
        unique_lock lock(m_mutex);

        // wait until the queue is not empty
        while (m_backingStore.empty())
            m_queueNotEmptyCondition.wait(lock); // releases the lock temporarily

        T t = m_backingStore.front();
        m_backingStore.pop_front();

        m_queueNotFullCondition.notify_all(); // notify waiters that the queue is not full

        return t;
    }

private:
    typedef boost::recursive_mutex mutex;
    typedef boost::unique_lock<boost::recursive_mutex> unique_lock;

    size_t const m_maxSize;

    mutex mutable m_mutex;
    boost::condition_variable_any m_queueNotEmptyCondition;
    boost::condition_variable_any m_queueNotFullCondition;

    std::deque<T> m_backingStore;
};
1 голос
/ 28 сентября 2010

Здесь вы можете взглянуть на другой подход, использующий атомарные операции C ++ 0x

http://www.drdobbs.com/high-performance-computing/210604448

1 голос
/ 28 сентября 2010

Я бы немного реструктурировал:

WorkItem GetWorkItem()
{
    while(true)
    {
        WaitForSingleObject(queue.Ready);
        {
            ScopeLock lock(queue.Lock);
            if(!queue.IsEmpty())
            {
                return queue.GetItem();
            }
        }
    }
}

int WorkerThread(param) 
{ 
    bool done = false;
    do
    {
        WorkItem work  = GetWorkItem();
        if( work.IsQuitMessage() )
        {
            done = true;
        }
        else
        {
            work.Process();
        }
    } while(!done);

    return 0; 
} 

Достопримечательности:

  1. ScopeLock - это класс RAII , чтобы сделать использование критического участка более безопасным.
  2. Блокировать событие до тех пор, пока рабочий элемент (возможно) не будет готов - , затем блокировать, пока пытается удалить его из очереди.
  3. не использовать глобальный флаг "IsDone", поставьте в очередь специальное сообщение quitmessage WorkItem s.
1 голос
/ 28 сентября 2010

Существуют различные способы сделать это

Например, вы можете создать событие, которое называется «запустить», а затем использовать его, чтобы определить, когда поток должен завершиться, тогда основной поток подает сигнал. Вместо сна вы должны использовать WaitForSingleObject с тайм-аутом, так что вы будете выходить напрямую, а не ждать мс сна.

Другой способ - принять сообщения в цикле, а затем придумать пользовательское сообщение, которое вы публикуете в теме

РЕДАКТИРОВАТЬ: в зависимости от ситуации также может быть целесообразно иметь еще один поток, который отслеживает этот поток, чтобы проверить, мертв ли ​​он или нет, это может быть сделано с помощью вышеупомянутой очереди сообщений, поэтому ответ на определенное сообщение в течение х мс будет означать, что поток не заблокирован.

0 голосов
/ 28 сентября 2010

Учитывая, что этот вопрос помечен windows, я отвечу так:

Не создавать 1 рабочий поток. Ваши рабочие задания, по-видимому, независимы, поэтому вы можете обрабатывать несколько заданий одновременно? Если так:

  • В вашем основном потоке вызовите CreateIOCompletionPort для создания объекта порта завершения io.
  • Создать пул рабочих потоков. Количество, которое вам нужно создать, зависит от того, сколько заданий вы хотите обслуживать параллельно. Некоторое количество ядер процессора - хорошее начало.
  • Каждый раз, когда задание приходит в вызов, PostQueuedCompletionStatus () передает указатель на структуру задания в виде структуры lpOverlapped.
  • Каждый рабочий поток вызывает GetQueuedCompletionItem () - извлекает рабочий элемент из указателя lpOverlapped и выполняет работу, прежде чем вернуться к GetQueuedCompletionStatus.

Это выглядит тяжело, но порты завершения io реализованы в режиме ядра и представляют собой очередь, которая может быть десериализована в любой из рабочих потоков, связанных с этой очередью (то есть ожидающих вызова GetQueuedCompletionStatus). Порт завершения io знает, сколько потоков, которые обрабатывают элемент, фактически используют ЦП и заблокированы при вызове ввода-вывода - и освободит больше рабочих потоков из пула, чтобы обеспечить соблюдение счетчика одновременности.

Итак, он не легкий, но очень эффективный ... Порт завершения может быть связан, например, с дескрипторами канала и сокета и может исключать результаты асинхронных операций над этими дескрипторами. Проекты портов завершения io могут масштабироваться для обработки десятков тысяч соединений с сокетами на одном сервере, но на настольных компьютерах мира это очень удобный способ масштабирования обработки заданий на 2 или 4 ядра, которые теперь распространены на настольных ПК.

0 голосов
/ 28 сентября 2010

Храните сигнализацию и синхронизацию отдельно. Что-то в этом роде ...

// in main thread

HANDLE events[2];
events[0] = CreateEvent(...); // for shutdown
events[1] = CreateEvent(...); // for work to do

// start thread and pass the events

// in worker thread

DWORD ret;
while (true)
{
   ret = WaitForMultipleObjects(2, events, FALSE, <timeout val or INFINITE>);

   if shutdown
      return
   else if do-work
      enter crit sec
      unqueue work
      leave crit sec
      etc.
   else if timeout
      do something else that has to be done
}
0 голосов
/ 28 сентября 2010

Используйте семафор вместо события.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...