форсировать условие синхронизации Синхронизированной очереди в C ++ не уведомляет ожидающий метод класса в другом потоке - PullRequest
0 голосов
/ 30 января 2012

Я пытаюсь реализовать синхронизированную очередь с условными переменными, используя библиотеку повышения потоков, очень похожую на приведенный здесь пример -> ( ImplementingThreadSafeQueue ).

Предпосылки / Цель : Я пишу службу Windows как часть старшего дизайн-проекта. Во всем сервисе мне бы хотелось иметь различные уровни ведения журнала (как для файлов, так и для окна просмотра событий Windows), а также я использую свою собственную оболочку «EventTimer» вокруг функции «CreateTimerQueueTimer» для создания синхронизированных событий, таких как служба, сообщающая о стук сердца. Моя идея состоит в том, чтобы помещать объекты сообщений в синхронизированную очередь и иметь класс регистратора, который наблюдает за очередью в своем собственном потоке, ожидая выполнения различных задач ведения журнала. Для простоты я сейчас тестирую только со строками.

Проблема : Поток регистратора запускает метод, принадлежащий классу журналирования, для извлечения рабочих элементов из очереди. Если я помещаю материал в очередь извне класса, скажем, из потока EventTimer или даже из потока MAIN, регистратор никогда не получает уведомления о новых элементах в очереди. Однако, если я создаю два потока, принадлежащих к классу регистратора, и использую один из этих потоков для помещения чего-либо в очередь, регистратор увидит это и ответит. Я хотел бы, чтобы любой поток мог добавлять материал в очередь и иметь возможность уведомлять регистратор о новых элементах.

Мой код указан ниже. Любая помощь будет оценена. Спасибо за ваше время!

Код синхронизированной очереди

#ifndef _SYNCHRONIZED_QUEUE_
#define _SYNCHRONIZED_QUEUE_

// Include Files
#include <boost\noncopyable.hpp>
#include <boost\thread.hpp>
#include <queue>

namespace GSMV
{

///////////////////////////////////////////////////////////////////////////////////////
/// Class: SynchronizedQueue
///
/// @Brief
/// SynchronizedQueue is a thread safe STL Queue wrapper that waits on Dequeue and 
/// notifies a listening thread on Enqueue. It is not copyable.
///////////////////////////////////////////////////////////////////////////////////////
template <typename T>
class SynchronizedQueue : private boost::noncopyable 
{
    public:
    struct Canceled{};

    ///////////////////////////////////////////////////////////////////////////////////////
    /// Function: Constructor
    ///
    /// @Brief
    /// Default constructor for the SynchronizedQueue object.
    ///////////////////////////////////////////////////////////////////////////////////////
    SynchronizedQueue(void)
    {
      // Queue is not canceled to start with
      this->mCanceled = false;       

      // Nobody waiting yet
      this->mWaiting = 0;
    }

    ///////////////////////////////////////////////////////////////////////////////////////
    /// Function: Enqueue
    /// 
    /// @Param const T &item: Item of type T to add to queue. 
    ///
    /// @Brief
    /// Adds an item of type T to the queue notifying via a condition.
    ///////////////////////////////////////////////////////////////////////////////////////
        void Enqueue(const T &item)
        {
      bool enqueued = false;

      // acquire lock on the queue
      boost::unique_lock<boost::mutex> lock(this->mMutex);

      // make sure the queue is not canceled
      if (this->mCanceled)
        throw Canceled();

      // add item to the queue
            this->mQueue.push(item);

      // notify others that queue has a new item
      this->mItemAvailable.notify_one();
        }

    ///////////////////////////////////////////////////////////////////////////////////////
    /// Function: Dequeue
    /// 
    /// @Return
    /// Item of type T from front of queue. 
    ///
    /// @Brief
    /// Returns an item of type T from the queue and deletes the front of the queue. Thread
    /// will wait on an empty queue until it is signaled via Enqueue.
    ///////////////////////////////////////////////////////////////////////////////////////
        T Dequeue(void)
        {
      // acquire lock on the queue
      boost::unique_lock<boost::mutex> lock(this->mMutex);

      // make sure the queue is not canceled
      if (this->mCanceled)
        throw Canceled();

      // one more thread is waiting on this item
      ++this->mWaiting;

      // if the queue is empty, wait until an item is added
      // lock is released inside the wait
      // lock is re-acquired after the wait
            while (this->mQueue.empty())
              this->mItemAvailable.wait(lock);

      // the thread is done waiting now
      --this->mWaiting;

      // retrieve and remove the item from the queue
            T item = this->mQueue.front();
            this->mQueue.pop();

            return item;
      // lock is released
        }

    ///////////////////////////////////////////////////////////////////////////////////////
    /// Function: GetSize
    /// 
    /// @Return
    /// The current size of the queue (number of items in the queue). 
    ///
    /// @Brief
    /// Returns the number of items contained in the queue.
    ///////////////////////////////////////////////////////////////////////////////////////
        int GetSize(void) 
        {
      // acquire lock on the queue
        boost::unique_lock<boost::mutex> lock(this->mMutex);

      // make sure the queue is not canceled
      if (this->mCanceled)
        throw Canceled();

            return this->mQueue.size();
      // lock is released
        }

    ///////////////////////////////////////////////////////////////////////////////////////
    /// Function: IsEmpty
    /// 
    /// @Return
    /// Boolean queue is empty. 
    ///
    /// @Brief
    /// Returns true if queue is empty false otherwise.
    ///////////////////////////////////////////////////////////////////////////////////////
        bool IsEmpty(void)
        {
      // acquire lock on the queue
        boost::unique_lock<boost::mutex> lock(this->mMutex);

      // make sure the queue is not canceled
      if (this->mCanceled)
        throw Canceled();

            return this->mQueue.empty();
      // lock is released
        }

    void Cancel(void)
    {
      // acquire lock on the queue
        boost::unique_lock<boost::mutex> lock(this->mMutex);

      // make sure the queue is not canceled
      if (this->mCanceled)
        throw Canceled();

      this->mCanceled = true;

      // notify all others that queue has a new item
      this->mItemAvailable.notify_all();

      while (0 < this->mWaiting)
        this->mItemAvailable.wait(lock);
    }

    void Reset(void)
    {
      // acquire lock on the queue
        boost::unique_lock<boost::mutex> lock(this->mMutex);

      // reset the canceled arguement
      this->mCanceled = false;
    }

    private:
    bool mCanceled;
    int mWaiting;
        std::queue<T> mQueue; // the STL Queue
        boost::mutex mMutex;  // the mutex object
        boost::condition_variable mItemAvailable; // the signal condition
};

} // Namespace GSMV


#endif /// _SYNCHRONIZED_QUEUE_

Код регистратора

#ifndef _LOGGER_H_
#define _LOGGER_H_

#include "SynchronizedQueue.h"
#include <string>
#include <boost\thread.hpp>

namespace GSMV
{

static SynchronizedQueue<std::string> logQ;

class Logger
{
  public:
    Logger(void);
    ~Logger(void);

    bool Start(void);
    bool Stop(void);
    bool IsRunning(void) const;
    void LoggerWorkThread(void);

  private:
    boost::thread* mpLoggerThread;
};

} // Namespace GSMV

#endif
// FILE END - logger.h //



#include "Logger.h"

using namespace GSMV;

Logger::Logger(void)
{
  this->mpLoggerThread = NULL;
}

Logger::~Logger(void)
{
  this->Stop();
}

bool Logger::Start(void)
{
  bool started = this->IsRunning();

  if (!started)
  {
    this->mpLoggerThread = new boost::thread(&Logger::LoggerWorkThread, this);
    started = (NULL != this->mpLoggerThread);
  }

  return started;
}

bool Logger::Stop(void)
{
  bool stopped = !this->IsRunning();

  if (!stopped)
  {
    this->mpLoggerThread->interrupt();
    this->mpLoggerThread->join();

    delete this->mpLoggerThread;
    this->mpLoggerThread = NULL;

    stopped = true;
  }

  return stopped;
}

bool Logger::IsRunning(void) const
{
  return (NULL != this->mpLoggerThread);
}

void Logger::LoggerWorkThread(void)
{
  std::cout << "Enter Logger Work Thread\n" << std::endl;

  while (this->IsRunning())
  {
    std::cout << "LOG: wait for Q..." << std::endl;
    std::string s = logQ.Dequeue();
    std::cout << "LOG: Got item! => " << s << std::endl;

    boost::this_thread::interruption_point();
  }

  std::cout << "Exit Logger Work Thread\n" << std::endl;
}

Таким образом, используя приведенный выше код, я бы создал объект logger и вызвал метод Start (). В идеале он запускает новый поток, который зацикливается, проверяя очередь на наличие строковых элементов, пока не будет вызван метод Stop (). Итак, вернувшись в свою основную функцию, я могу поместить строки в очередь, и регистратор должен их получить, но регистратор никогда не получит уведомление. Если это имеет значение, очередь объявляется в заголовочном файле Logger как «static SynchronizedQueue logQ». Опять же, я был бы признателен за любые предложения здесь. Спасибо!

1 Ответ

2 голосов
/ 30 января 2012

Вы должны разблокировать мьютекс перед вызовом notify_one или notify_all для переменной условия.

...