Реализация мьютекса FIFO в pthreads - PullRequest
12 голосов
/ 22 марта 2011

Я пытаюсь реализовать двоичное дерево, поддерживающее одновременные вставки (что может происходить даже между узлами), но без необходимости выделять глобальную блокировку или отдельный мьютекс или мьютексы для каждого узла. Скорее, количество таких выделенных блокировок должно быть порядка количества потоков с использованием дерева.

Следовательно, я сталкиваюсь с проблемой типа Lock Convo . Проще говоря, это то, что потенциально происходит, когда два или более потока выполняют следующее:

1 for(;;) {
2   lock(mutex)
3   do_stuff
4   unlock(mutex)
5 }

То есть, если поток № 1 выполняет инструкции 4-> 5-> 1-> 2, все в одном "выбросе процессора", поток № 2 истощается от выполнения.

С другой стороны, если бы была возможность блокировки типа FIFO для мьютексов в pthreads, такой проблемы можно было бы избежать. Итак, есть ли способ реализовать блокировку мьютекса FIFO-типа в pthreads? Может ли изменение приоритетов потока выполнить это?

Ответы [ 6 ]

3 голосов
/ 22 марта 2011

Можно реализовать справедливую систему очередей, в которой каждый поток добавляется в очередь, когда он блокируется, и первый поток в очереди всегда получает ресурс, когда он становится доступным.Такой «честный» тикет, построенный на примитивах pthreads, может выглядеть так:

#include <pthread.h>

typedef struct ticket_lock {
    pthread_cond_t cond;
    pthread_mutex_t mutex;
    unsigned long queue_head, queue_tail;
} ticket_lock_t;

#define TICKET_LOCK_INITIALIZER { PTHREAD_COND_INITIALIZER, PTHREAD_MUTEX_INITIALIZER }

void ticket_lock(ticket_lock_t *ticket)
{
    unsigned long queue_me;

    pthread_mutex_lock(&ticket->mutex);
    queue_me = ticket->queue_tail++;
    while (queue_me != ticket->queue_head)
    {
        pthread_cond_wait(&ticket->cond, &ticket->mutex);
    }
    pthread_mutex_unlock(&ticket->mutex);
}

void ticket_unlock(ticket_lock_t *ticket)
{
    pthread_mutex_lock(&ticket->mutex);
    ticket->queue_head++;
    pthread_cond_broadcast(&ticket->cond);
    pthread_mutex_unlock(&ticket->mutex);
}
3 голосов
/ 22 марта 2011

Вы можете сделать что-то вроде этого:

  • определяет «блокировку в очереди», которая состоит из флага «занят / занят» и связанного списка переменных условия pthread. доступ к очереди заблокирован мьютексом

  • для блокировки очереди:

    • захватить мьютекс
    • установите флажок «занят»
    • если не занят; установить занят = истина; освободить мьютекс; сделано
    • если занят; создать новое условие @ конец очереди и ждать его (освобождая мьютекс)
  • для разблокировки:

    • захватить мьютекс
    • если никакой другой поток не стоит в очереди, busy = false; освободить мьютекс; сделал
    • pthread_cond_signal первое условие ожидания
    • do not сбросить флаг «занят» - владение переходит к другому потоку
    • освобождение мьютекса
  • когда ожидающий поток разблокирован pthread_cond_signal:

    • удалить наше условие var из заголовка очереди
    • освобождение мьютекса

Обратите внимание, что мьютекс блокируется только во время изменения состояния queued_lock, а не на весь период, в течение которого queued_lock удерживается.

2 голосов
/ 22 марта 2011

Пример, который вы публикуете, не имеет решения.По сути, у вас есть только один критический раздел, и нет места для параллелизма.

Тем не менее, вы видите, что важно сократить период, в течение которого ваши потоки удерживают мьютекс, до минимума, всего лишь несколько инструкций.Это трудно вставить в динамическую структуру данных, такую ​​как дерево.Концептуально простое решение состоит в том, чтобы иметь одну блокировку чтения-записи на узел дерева.

Если вы не хотите иметь индивидуальные блокировки на узел дерева, вы можете иметь одну структуру блокировки на уровень дерева.Я бы поэкспериментировал с блокировками чтения-записи для этого.Вы можете использовать только блокировку чтения уровня узла в руке (плюс следующий уровень) при прохождении дерева.Затем, когда вы нашли правильный, чтобы вставить блокировку этого уровня для записи.

1 голос
/ 05 декабря 2011

Вы можете получить хороший Mutex с идеей, набросанной @caf, но используя атомарные операции для получения билета до того, как сделает фактическую блокировку.

#if defined(_MSC_VER)
typedef volatile LONG Sync32_t;
#define SyncFetchAndIncrement32(V) (InterlockedIncrement(V) - 1)
#elif (__GNUC__ * 10000 + __GNUC_MINOR__ * 100 + __GNUC_PATCHLEVEL__) > 40100
typedef volatile uint32_t Sync32_t;
#define SyncFetchAndIncrement32(V) __sync_fetch_and_add(V, 1)
#else
#error No atomic operations
#endif

class FairMutex {
private:
    Sync32_t                _nextTicket;
    Sync32_t                _curTicket;
    pthread_mutex_t         _mutex;
    pthread_cond_t          _cond;

public:
    inline FairMutex() : _nextTicket(0), _curTicket(0), _mutex(PTHREAD_MUTEX_INITIALIZER), _cond(PTHREAD_COND_INITIALIZER)
    {
    }
    inline ~FairMutex()
    {
        pthread_cond_destroy(&_cond);
        pthread_mutex_destroy(&_mutex);
    }
    inline void lock()
    {
        unsigned long myTicket = SyncFetchAndIncrement32(&_nextTicket);
        pthread_mutex_lock(&_mutex);
        while (_curTicket != myTicket) {
            pthread_cond_wait(&_cond, &_mutex);
        }
    }
    inline void unlock()
    {
        _curTicket++;
        pthread_cond_broadcast(&_cond);
        pthread_mutex_unlock(&_mutex);
    }
};

В более широком смысле, я бы не назвал это FIFO Mutex, потому что создается впечатление, что он поддерживает порядок, которого прежде нет. Если ваши потоки параллельно вызывают lock (), у них не может быть порядка перед вызовом блокировки, поэтому нет смысла создавать мьютекс, сохраняющий отношение порядка, которого там нет.

1 голос
/ 08 апреля 2011

Решением может быть использование атомарных операций . Без блокировки, без переключения контекста, без сна и намного быстрее, чем мьютексы или условные переменные. Атомные операции не являются окончательным решением для всего, но мы создали много поточно-ориентированных версий общих структур данных, используя только атомарные операции. Они очень быстрые.

Атомные операции - это серия простых операций, таких как увеличение, уменьшение или назначение, которые гарантированно выполняются атомарно в многопоточной среде. Если два потока попадают в операцию одновременно, процессор гарантирует, что один поток выполняет операцию одновременно. Атомные операции - это аппаратные инструкции, поэтому они быстрые «Сравнить и поменять» очень полезно для многопоточных структур данных. В нашем тестировании атомарное сравнение и свопирование выполняются примерно с 32-битным целочисленным назначением. Может быть, в 2 раза медленнее. Если учесть, сколько процессоров потребляется мьютексами, атомные операции выполняются бесконечно быстрее.

Это не тривиально, чтобы сделать вращения, чтобы сбалансировать ваше дерево с атомарными операциями, но не невозможно. Я сталкивался с этим требованием в прошлом и обманул, сделав потокобезопасным skiplist , так как с атомарными операциями можно очень просто создать skiplist. Извините, я не могу дать вам копию нашего кода ... моя компания уволит меня, но это достаточно легко сделать самостоятельно.

Как атомарные операции позволяют создавать структуры данных без блокировок, можно визуализировать на примере простого связанного с потоком безопасного списка. Чтобы добавить элемент в глобальный связанный список (_pHead) без использования блокировок. Сначала сохраните копию _pHead, pOld. Я думаю об этих копиях как о «состоянии мира» при выполнении одновременных операций. Затем создайте новый узел pNew и установите для его pNext значение COPY . Затем используйте атомарное «сравнить и поменять», чтобы изменить _pHead на pNew ТОЛЬКО ЕСЛИ pHead ЕЩЕ ПРОСТО. Атомная операция будет успешной, только если _pHead не изменился. Если это не удалось, вернитесь назад, чтобы получить копию нового _pHead, и повторите.

Если операция прошла успешно, остальной мир теперь увидит новую голову. Если поток получил старую голову за наносекунду раньше, этот поток не увидит новый элемент, но список все равно будет безопасным для перебора. Поскольку мы предварительно установили pNext на старую головку, ДО того, как мы добавили наш новый элемент в список, если поток увидит новую головку через наносекунду после того, как мы добавим ее, список будет безопасным для перемещения.

Глобальный материал:

typedef struct _TList {
  int data;
  struct _TList *pNext;
} TList;

TList *_pHead;

Добавить в список:

TList *pOld, *pNew;
...
// allocate/fill/whatever to make pNew
...
while (1) { // concurrency loop
  pOld = _pHead;  // copy the state of the world. We operate on the copy
  pNew->pNext = pOld; // chain the new node to the current head of recycled items
  if (CAS(&_pHead, pOld, pNew))  // switch head of recycled items to new node
    break; // success
}

CAS является сокращением для __ sync_bool_compare_and_swap и т.п. Видишь как легко? Нет мьютексов ... нет замков! В том редком случае, когда 2 потока попадают в этот код одновременно, один просто повторяется во второй раз. Мы видим только второй цикл, потому что планировщик меняет поток, находясь в цикле параллелизма. так что это редко и несущественно.

Вещи можно снять с головы связанного списка аналогичным образом. Вы можете атомарно изменить более одного значения, если вы используете объединения и можете использовать до 128-битных атомарных операций. Мы протестировали 128 бит на 32-битной Redhat Linux, и они имеют такую ​​же скорость, что и 32, 64-битные атомные операции.

Вы должны выяснить, как использовать этот тип техники с вашим деревом. Узел дерева b будет иметь два ptrs для дочерних узлов. Вы можете CAS их, чтобы изменить их. Проблема балансировки сложная. Я вижу, как вы можете проанализировать ветвь дерева, прежде чем что-то добавить и сделать копию ветви с определенной точки. Когда вы заканчиваете менять ветку, вы вводите новую CAS. Это будет проблемой для больших веток. Может быть, балансировка может быть сделана «позже», когда потоки не борются за дерево. Может быть, вы можете сделать так, чтобы дерево было доступно для поиска, даже если вы не каскадировали вращение полностью ... другими словами, если поток A добавил узел и рекурсивно вращает узлы, поток b все еще может читать или добавлять узлы. Просто несколько идей. В некоторых случаях мы создаем структуру, которая имеет номера версий или флаги блокировки в 32 битах после 32 битов pNext. Тогда мы используем 64-битный CAS. Возможно, вы могли бы сделать дерево безопасным для чтения в любое время без блокировок, но вам, возможно, придется использовать технику управления версиями для ветви, которая изменяется.

Вот несколько постов, которые я сделал, рассказывающих о преимуществах атомных операций:

Потоки и мьютексы; запирающая часть массива

Эффективный и быстрый способ для аргумента потока

Конфигурация автоматической перезагрузки с помощью pthreads

Преимущества использования условных переменных над мьютексом

манипулирование одним битом

Является ли выделение памяти в linux неблокирующим?

0 голосов
/ 22 марта 2011

Вы можете взглянуть на функцию pthread_mutexattr_setprioceiling.

int pthread_mutexattr_setprioceiling
(
    pthread_mutexatt_t * attr, 
    int prioceiling,
    int * oldceiling
);

Из документации:

pthread_mutexattr_setprioceiling (3THR) устанавливает атрибут потолка приоритета атрибута мьютексаobject.

attr указывает на объект атрибута мьютекса, созданный более ранним вызовом pthread_mutexattr_init ().

prioceiling определяет потолок приоритета инициализированных мьютексов.Потолок определяет минимальный уровень приоритета, при котором выполняется критический участок, защищенный мьютексом.prioceiling будет в пределах максимального диапазона приоритетов, определенных SCHED_FIFO.Чтобы избежать инверсии приоритета, prioceiling будет установлен приоритет выше или равен наивысшему приоритету всех потоков, которые могут заблокировать конкретный мьютекс.

oldceiling содержит старое значение потолка приоритета.

...