Повысить взаимоблокировку состояния с помощью wait () в коде производителя-потребителя. - PullRequest
4 голосов
/ 14 октября 2010

Я реализовал базовый многопоточный производитель-потребитель (поток 1 = производитель, поток 2 = потребитель), используя потоки и условия Boost.Я застреваю в wait () на неопределенное время довольно часто.Я действительно не вижу, что здесь может быть не так.Ниже приведен псевдокод:

// main class
class Main {
public:
  void AddToQueue(...someData...)
  {
    boost::mutex::scoped_lock lock(m_mutex);
    m_queue.push_back(new QueueItem(...someData...));
    m_cond.notify_one(); 
  }

  void RemoveQueuedItem(...someCond...)
  {
    // i'm wondering if this could cause the trouble?
    boost::mutex::scoped_lock lock(m_mutex);
    // erase a item matching condition (some code not shown,
    // but should be fairly self-explanatory -- IsMatch()
    // simply looks at a flag of QueueItem
    m_queue.erase(std::remove_if(m_queue.being(), m_queue.end(),
      boost::bind(&Main::IsMatch, this, _1, someCond), m_queue.end());
  }

  friend void WorkerThread(Main* m);
private:      
  boost::ptr_deque<QueueItem> m_queue;
  boost::mutex m_mutex;
  boost::condition m_cond;
};

// worker thread
void WorkerThread(Main* m)
{
  typedef boost::ptr_deque<QueueItem>::auto_type RelType;
  RelType queueItem;

  while(!shutDown) {
    { // begin mutex scope
      boost::mutex::scoped_lock lock(m->m_mutex);
      while(m->m_queue.empty()) {
        m->m_cond.wait(lock); // <- stuck here forever quite often!
      }
      queueItem = m->m_queue->pop_front(); // pop & take ptr ownership
    } // end mutex scope

    // ... do stuff with queueItem
    // ...
    // ... queueItem is deleted when it leaves scope & we loop around
  }
}

Дополнительная информация:

  • Использование Boost v1.44
  • Проблема возникает в Linux и Android;Я еще не уверен, происходит ли это в Windows

Есть идеи?

ОБНОВЛЕНИЕ : я верю Я выделил проблему,Я обновлю еще раз после подтверждения, что, надеюсь, будет завтра.

ОБНОВЛЕНИЕ 2 : Оказывается, в коде, описанном выше, нет проблем.Я полагался на базовый API для AddToQueue () - при обработке данных в рабочем потоке и передаче их обратно в API у него была циклическая ошибка, при которой он снова вызывал бы AddToQueue () ..., которая теперь исправлена ​​;-)

Ответы [ 2 ]

2 голосов
/ 14 октября 2010

Недавно я сделал нечто подобное, хотя мой использует очередь STL. Посмотрим, сможете ли вы выбрать из моей реализации. Как говорит wilx , нужно дождаться состояния. Моя реализация имеет максимальный предел для элементов в очереди, и я использую это для ожидания освобождения мьютекса / охранника.

Первоначально я делал это в Windows, имея в виду возможность использования разделов Mutex или Critical, поэтому параметр шаблона можно удалить и использовать boost::mutex напрямую, если он упрощает его для вас.

#include <queue>
#include "Message.h"
#include <boost/thread/locks.hpp>
#include <boost/thread/condition.hpp>

template <typename T> class Queue :  private boost::noncopyable
{
public:
  // constructor binds the condition object to the Q mutex
  Queue(T & mutex, size_t max_size) :  m_max_size(max_size), m_mutex(mutex){}

  // writes messages to end of Q 
  void put(const Message & msg)
  {
    // Lock mutex to ensure exclusive access to Q
    boost::unique_lock<T> guard(m_mutex);

    // while Q is full, sleep waiting until something is taken off of it
    while (m_queue.size() == m_max_size)
    {
      cond.wait(guard);
    }

    // ok, room on the queue. 
    // Add the message to the queue
    m_queue.push(msg);

    // Indicate so data can be ready from Q
    cond.notify_one();
  }

  // Read message from front of Q. Message is removed from the Q
  Message get(void)
  {
    // Lock mutex to ensure exclusive access to Q
    boost::unique_lock<T> guard(m_mutex);

    // If Q is empty, sleep waiting for something to be put onto it
    while (m_queue.empty())
    {
      cond.wait(guard);
    }

    // Q not empty anymore, read the value
    Message msg = m_queue.front();

    // Remove it from the queue
    m_queue.pop();

    // Signal so more data can be added to Q
    cond.notify_one();

    return msg;
  }

  size_t max_size(void) const
  {
    return m_max_size;
  }


private:
  const size_t m_max_size;
  T & m_mutex;
  std::queue<Message> m_queue;
  boost::condition_variable_any cond;
};

Таким образом, вы можете разделить очередь между производителем / потребителем. Пример использования

boost::mutex mutex;

Queue<boost::mutex> q(mutex, 100);

boost::thread_group threads;

threads.create_thread(Producer<boost::mutex>(q));
threads.create_thread(Consumer<boost::mutex>(q));

threads.join_all();

С определением производителя / потребителя, как указано ниже

template <typename T> class Producer
{
public:
   // Queue passed in
   explicit Producer(Queue<T> &q) :  m_queue(q) {}

   void operator()()
   {
   }
}
0 голосов
/ 14 октября 2010
m->m_cond.wait(); // <- stuck here forever quite often!

должно быть:

m->m_cond.wait( lock ); 

Вы мертво заблокировали свой класс, потому что у вас все еще был мьютекс, но вы ждали. Любой другой метод хочет получить тот же мьютекс и ждать вашего работника, который никогда не освободит мьютекс.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...