Существуют ли реализации параллельных очередей блокировки без блокировки? - PullRequest
4 голосов
/ 20 апреля 2011

Мне известны блокирующие очереди и очереди без блокировок, отличный пример тех реализаций, предоставляемых Скоттом и др. , но есть ли реализации блокировок без блокировок?очередь?

В очереди без блокировок очередь не требует блокировки, но если в очереди нет элементов, она блокирует потребителя.Есть ли реализации такого зверя?Я предпочитаю, чтобы они были реализациями на C #, но любая реализация технически работала бы.

Обновление:

Я думаю, что в итоге получаю условие гонки на линии D14.1:

initialize(Q: pointer to queue t)
node = new node() // Allocate a free node
node–>next.ptr = NULL // Make it the only node in the linked list
Q–>Head = Q–>Tail = node // Both Head and Tail point to it
signal = new ManualResetEvent() // create a manual reset event

    enqueue(Q: pointer to queue t, value: data type)
E1:     node = new node() // Allocate a new node from the free list
E2:     node–>value = value // Copy enqueued value into node
E3:     node–>next.ptr = NULL // Set next pointer of node to NULL
E4:     loop // Keep trying until Enqueue is done
E5:         tail = Q–>Tail // Read Tail.ptr and Tail.count together
E6:         next = tail.ptr–>next // Read next ptr and count fields together
E7:         if tail == Q–>Tail // Are tail and next consistent?
E8:             if next.ptr == NULL // Was Tail pointing to the last node?
E9:                 if CAS(&tail.ptr–>next, next, <node, next.count+1>) // Try to link node at the end of the linked list
E10.1:                  signal.Set() // Signal to the blocking dequeues
E10.2:                  break // Enqueue is done. Exit loop
E11:                endif
E12:            else // Tail was not pointing to the last node
E13:                CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) // Try to swing Tail to the next node
E14:            endif
E15:        endif
E16:     endloop
E17:    CAS(&Q–>Tail, tail, <node, tail.count+1>) // Enqueue is done. Try to swing Tail to the inserted node


    dequeue(Q: pointer to queue t, pvalue: pointer to data type): boolean
D1:     loop // Keep trying until Dequeue is done
D2:         head = Q–>Head // Read Head
D3:         tail = Q–>Tail // Read Tail
D4:         next = head–>next // Read Head.ptr–>next
D5:         if head == Q–>Head // Are head, tail, and next consistent?
D6:             if head.ptr == tail.ptr // Is queue empty or Tail falling behind?
D7:                 if next.ptr == NULL // Is queue empty?
D8.1:                   signal.WaitOne() // Block until an enqueue
D8.X:                   // remove the return --- return FALSE // Queue is empty, couldn’t dequeue
D9:                 endif
D10:                CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>) // Tail is falling behind. Try to advance it
D11:            else // No need to deal with Tail
                    // Read value before CAS, otherwise another dequeue might free the next node
D12:                *pvalue = next.ptr–>value
D13:                if CAS(&Q–>Head, head, <next.ptr, head.count+1>) // Try to swing Head to the next node
D14.1:                  if(head.ptr == tail.ptr && next.ptr==NULL) // Is queue empty? <--- POSSIBLE RACE CONDITION???
D14.2:                      signal.Reset()
D14.3:                  break // Dequeue is done. Exit loop
D15:                endif
D16:            endif
D17:         endif
D18:    endloop
D19:    free(head.ptr) // It is safe now to free the old dummy node
D20:    return TRUE // Queue was not empty, dequeue succeeded

Ответы [ 2 ]

1 голос
/ 21 апреля 2011

EDIT:

ПРОЩЕ: Я предлагаю вам не нужна голова и хвост для вашей очереди. Просто есть голова. Если head = NULL, список пуст. Добавьте предметы в голову. Удалить предметы из головы. Проще, меньше операций CAS.

ХЕЛПЕР: В комментариях я предложил вам подумать о вспомогательной схеме для управления гонкой. В моей версии, что означает «блокировка свободна», нормально иметь редкие условия гонки, если они не вызывают проблем. Мне нравится дополнительная производительность по сравнению с тем, что бездействующий поток спит пару мс слишком долго.

Вспомогательные идеи. Когда потребитель берет работу, он может проверить, есть ли нить в коме. Когда продюсер добавляет работу, он может искать нити в коме.

Так что следите за шпалами. Используйте связанный список спящих. Когда поток решает, что работы нет, он помечает себя как! Awake и сам CAS, возглавляющий список спящих. Когда получен сигнал на пробуждение, поток помечает себя как пробужденный. Затем вновь пробудившаяся нить очищает список спящих. Чтобы очистить одновременный единый связанный список, вы должны быть осторожны. Вы можете только CAS в голову. Таким образом, пока глава списка спящих помечена как активная, вы можете отключить ее. Если голова не проснулась, продолжайте сканировать список и «ленивая отсоединить» (я придумал этот термин) оставшиеся пробужденные предметы. Lazy unlink - это просто ... просто установите следующий ptr предыдущего предмета поверх пробужденного предмета. Одновременное сканирование все равно попадет в конец списка, даже если оно попадет в объекты, которые находятся в пробуждении. Последующие сканы видят более короткий список. Наконец, каждый раз, когда вы добавляете работу или выполняете работу, отсканируйте список спящих! Если потребитель замечает, что работа остается после захвата некоторой работы (.next work! = NULL), потребитель может просмотреть список спящих и сообщить о первом потоке! После того, как продюсер добавляет работу, продюсер может просмотреть список спящих и сделать то же самое.

Если у вас есть сценарий широковещательной рассылки, и вы не можете сигнализировать об одном потоке, просто сохраняйте количество спящих потоков. Несмотря на то, что это число все еще> 0, потребитель, замечающий оставшуюся работу, и потребитель, добавляющий работу, будут транслировать сигнал на пробуждение.

В нашей среде у нас есть 1 поток на SMT, поэтому список спящих никогда не может быть таким большим (ну, если я не получу в руки одну из этих 128 машин с параллельными потоками!) Мы создаем рабочие элементы на ранних этапах транзакции. В первую секунду мы можем сгенерировать 10000 рабочих элементов, и это производство быстро сойдет на нет. Потоки работают на пару секунд на этих рабочих элементах. Таким образом, у нас редко есть поток в пуле ожидания.

ВЫ МОЖЕТЕ ИСПОЛЬЗОВАТЬ ЗАМКИ Если у вас есть только 1 поток и вы редко создаете работу ... это не сработает для вас. В этом случае производительность мьютексов не имеет значения, и вы должны просто использовать их. Используйте блокировку очереди спящего в этом сценарии. Думайте о свободе от блокировки, как о том, что «никаких замков не существует».

ПРЕДЫДУЩАЯ ПОЧТА: Ты говоришь: Есть очередь на работу. Есть много потребительских тем. Потребитель должен взять работу и сделать это, если есть работа Потребительский поток должен спать, пока не будет работы.

Если вы, мы делаем это, используя только атомарные операции:

Очередь работы представляет собой связанный список. Также есть связанный список спящих тем.

Чтобы добавить работу: CAS глава списка к новой работе. Когда работа добавлена, мы проверяем, есть ли какие-либо темы в списке спящих. Если есть, прежде чем добавлять работу, мы исключаем спящего из списка спящих, устанавливаем его работу = новую работу и затем просим спящего проснуться. Мы добавляем работу в очередь работ.

Чтобы потреблять работу: CAS от главы списка к голове-> далее. Если заголовок рабочего списка равен NULL, мы отправляем поток в список спящих.

Как только у потока есть рабочий элемент, поток должен преобразовать состояние рабочего элемента в WORK_INPROGRESS или что-то подобное. Если это не удается, это означает, что работа выполняется другим, поэтому поток потребителя возвращается к поиску работы. Если поток просыпается и имеет рабочий элемент, он все равно должен сохранять состояние CAS.

Так что, если работа добавляется, спящий потребитель всегда просыпается и вручает работу. pthread_kill () всегда пробуждает поток в sigwait (), потому что даже если поток получает сигнал sigwait после сигнала, сигнал получен. Это решает проблему того, что поток помещает себя в список спящих, но получает сигнал перед сном. Все, что происходит, - поток пытается владеть своей -> работой, если она есть. Если у вас нет собственной работы или нет работы, поток возвращается в режим потребления. Если потоку не удается выполнить CAS в списке спящих, это означает, что его побьет другой поток или что производитель снял спящий. В целях безопасности у нас есть нить, действующая так, как будто она только что проснулась.

У нас нет условий для гонки, у нас есть несколько производителей и потребителей. Мы также смогли расширить это, чтобы позволить нитям спать на отдельных рабочих элементах.

1 голос
/ 20 апреля 2011

.NET параллельные расширения: (Встроенный, для .NET 4.0 +):

http://blogs.msdn.com/b/pfxteam/archive/2010/01/26/9953725.aspx

Кто-то из реализации StackOverflow:

Блокировка свободных конструкций в .net

Ответ на уточнение в комментариях:

Если блокировка, когда пусто не занято (ждет сигнала), то, похоже, вам нужен счетный семафор для ожидания.

Альтернативный подход можетиспользовать обычную очередь вместе с атомарным сравнением и обменом или спин-блокировкой для предотвращения одновременного доступа,затем, если поток потребителя пытается войти, когда очередь пуста, заблокируйте двоичный семафор,если поток провайдера пытается войти, когда очередь пуста, разблокируйте двоичный семафор, чтобы разбудить всех спящих потребителей (и вернуть их в спин-блокировку, чтобы несколько потоков могли войти, только если в очереди достаточно элементов для них).

Например, // псевдокод

/// Non-blocking lock (busy wait)
void SpinLock()
{
    While (CompareAndExchange(myIntegerLock, -1, 0) != 0)
    {
        // wait
    }
}

void UnSpinLock()
{
    Exchange(myIntegerLock, 0);
}

void AddItem(item)
{
    // Use CAS for synchronization
    SpinLock(); // Non-blocking lock (busy wait)

    queue.Push(item);

    // Unblock any blocked consumers
    if (queue.Count() == 1)
    {
        semaphore.Increase();
    }

    // End of CAS synchronization block
    UnSpinLock();
}

Item RemoveItem()
{
    // Use CAS for synchronization
    SpinLock(); // Non-blocking lock (busy wait)

    // If empty then block
    if (queue.Count() == 0)
    {
        // End of CAS synchronization block
        UnSpinLock();

        // Block until queue is not empty
        semaphore.Decrease();

        // Try again (may fail again if there is more than one consumer)
        return RemoveItem();
    }

    result = queue.Pop();

    // End of CAS synchronization block
    UnSpinLock();

    return result;
}
...