Эффективная потребительская нить с несколькими производителями - PullRequest
8 голосов
/ 26 августа 2010

Я пытаюсь сделать ситуацию с потоком производителя / потребителя более эффективной, пропуская дорогостоящие операции события, если необходимо, с чем-то вроде:

//cas(variable, compare, set) is atomic compare and swap
//queue is already lock free

running = false


// dd item to queue – producer thread(s)

if(cas(running, false, true))
{
  // We effectively obtained a lock on signalling the event
  add_to_queue()
  signal_event()
}
else
{
  // Most of the time if things are busy we should not be signalling the event
  add_to_queue()

  if(cas(running, false, true))
    signal_event()
}

...

// Process queue, single consumer thread

reset_event()

while(1)
{
  wait_for_auto_reset_event() // Preferably IOCP

  for(int i = 0; i &lt SpinCount; ++i)
    process_queue()

  cas(running, true, false)

  if(queue_not_empty())
    if(cas(running, false, true))
      signal_event()
}

Очевидно, что попытка исправить все это немного сложно (!), Так ли верен приведенный выше псевдокод? Решение, которое сигнализирует о событии больше, чем необходимо, является правильным, но не тем, которое делает это для каждого элемента.

Ответы [ 4 ]

2 голосов
/ 03 марта 2011

Это относится к подкатегории «перестаньте возиться и возвращайтесь к работе», известной как «преждевременная оптимизация». : -)

Если «дорогие» операции с событиями занимают значительную часть времени, ваш дизайн ошибочен, и вместо того, чтобы использовать производителя / потребителя, вы должны использовать критический раздел / мьютекс и просто выполнять работу из вызывающего потока.

Я предлагаю вам подать заявку, если вы действительно обеспокоены.

Обновлен:

Правильный ответ:

Производитель

ProducerAddToQueue(pQueue,pItem){

    EnterCriticalSection(pQueue->pCritSec)
        if(IsQueueEmpty(pQueue)){
            SignalEvent(pQueue->hEvent)
        }

        AddToQueue(pQueue, pItem)
    LeaveCriticalSection(pQueue->pCritSec)
}

Потребитель

nCheckQuitInterval = 100; // Every 100 ms consumer checks if it should quit.

ConsumerRun(pQueue)
{
    while(!ShouldQuit())
    {
        Item* pCurrentItem = NULL;
        EnterCriticalSection(pQueue-pCritSec);
            if(IsQueueEmpty(pQueue))
            {
                ResetEvent(pQueue->hEvent)
            }
            else
            {
                pCurrentItem = RemoveFromQueue(pQueue);
            }
        LeaveCriticalSection(pQueue->pCritSec);

        if(pCurrentItem){
            ProcessItem(pCurrentItem);
            pCurrentItem = NULL;
        }
        else
        {
            // Wait for items to be added.
            WaitForSingleObject(pQueue->hEvent, nCheckQuitInterval);
        }

    }
}

Примечания:

  • Событие является событием ручного сброса.
  • Операции, защищенные критическим разделом, выполняются быстро. Событие устанавливается или сбрасывается только тогда, когда очередь переходит в / из пустого состояния. Он должен быть установлен / сброшен в критической секции, чтобы избежать состояния гонки.
  • Это означает, что критическая секция удерживается только в течение короткого времени. так что раздоры будут редкими.
  • Критические разделы не блокируются, если они не утверждены. Так что переключение контекста будет редким.

Предположения:

  • Это настоящая проблема, а не домашняя работа.
  • Производители и потребители проводят большую часть своего времени, занимаясь другими делами, то есть готовят элементы для очереди, обрабатывают их после удаления из очереди.
  • Если они проводят большую часть времени, выполняя реальные операции с очередями, вам не следует использовать очередь. Я надеюсь, что это очевидно.
0 голосов
/ 08 октября 2010

Почему бы просто не связать bool с событием? Используйте cas, чтобы установить для него значение true, и если cas завершится успешно, сообщите о событии, потому что событие должно быть сброшено. Затем официант может просто снять флаг, прежде чем ждать

bool flag=false;

// producer
add_to_queue();
if(cas(flag,false,true))
{
    signal_event();
}

// consumer
while(true)
{
    while(queue_not_empty())
    {
        process_queue();
    }
    cas(flag,true,false); // clear the flag
    if(queue_is_empty())
        wait_for_auto_reset_event();
}

Таким образом, вы только ждете, если в очереди нет элементов, и вы сигнализируете событие только один раз для каждой партии элементов.

0 голосов
/ 02 декабря 2010

Я полагаю, вы хотите достичь чего-то вроде этого вопроса:
Многопоточность WinForms: запускать обновление графического интерфейса, только если предыдущее уже закончено. Оно специфично для C # и Winforms, но структура вполне может подойти для вас.

0 голосов
/ 06 октября 2010

Перебрал кучу дел, не вижу проблемы. Но это довольно сложно. Я подумал, что, возможно, у вас возникнут проблемы с гонками queue_not_empty / add_to_queue. Но похоже, что пост-доминирующий CAS в обоих направлениях охватывает этот случай.

CAS стоит дорого (не так дорого, как сигнал). Если вы ожидаете, что пропущенный сигнал будет обычным, я бы кодировал CAS следующим образом:

bool cas(variable, old_val, new_val) {
   if (variable != old_val) return false
   asm cmpxchg
}

Такие структуры без блокировки - вот то, что Jinx (продукт, над которым я работаю) очень хорошо тестирует. Так что вы можете использовать eval-лицензию для тестирования очереди без блокировки и логики оптимизации сигналов.


Редактировать: возможно, вы можете упростить эту логику.

running = false 

// add item to queue – producer thread(s) 
add_to_queue()
if (cas(running, false, true)) {
   signal_event()
}

// Process queue, single consumer thread 

reset_event() 

while(1) 
{ 
    wait_for_auto_reset_event() // Preferably IOCP 

   for(int i = 0; i &lt SpinCount; ++i) 
       process_queue() 

   cas(running, true, false)  // this could just be a memory barriered store of false

   if(queue_not_empty()) 
      if(cas(running, false, true)) 
         signal_event() 
} 

Теперь, когда cas / signal всегда рядом друг с другом, их можно переместить в подпрограмму.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...