Я пытаюсь реализовать синхронизированную очередь с условными переменными, используя библиотеку повышения потоков, очень похожую на приведенный здесь пример -> ( ImplementingThreadSafeQueue ).
Предпосылки / Цель : Я пишу службу Windows как часть старшего дизайн-проекта. Во всем сервисе мне бы хотелось иметь различные уровни ведения журнала (как для файлов, так и для окна просмотра событий Windows), а также я использую свою собственную оболочку «EventTimer» вокруг функции «CreateTimerQueueTimer» для создания синхронизированных событий, таких как служба, сообщающая о стук сердца. Моя идея состоит в том, чтобы помещать объекты сообщений в синхронизированную очередь и иметь класс регистратора, который наблюдает за очередью в своем собственном потоке, ожидая выполнения различных задач ведения журнала. Для простоты я сейчас тестирую только со строками.
Проблема : Поток регистратора запускает метод, принадлежащий классу журналирования, для извлечения рабочих элементов из очереди. Если я помещаю материал в очередь извне класса, скажем, из потока EventTimer или даже из потока MAIN, регистратор никогда не получает уведомления о новых элементах в очереди. Однако, если я создаю два потока, принадлежащих к классу регистратора, и использую один из этих потоков для помещения чего-либо в очередь, регистратор увидит это и ответит. Я хотел бы, чтобы любой поток мог добавлять материал в очередь и иметь возможность уведомлять регистратор о новых элементах.
Мой код указан ниже. Любая помощь будет оценена. Спасибо за ваше время!
Код синхронизированной очереди
#ifndef _SYNCHRONIZED_QUEUE_
#define _SYNCHRONIZED_QUEUE_
// Include Files
#include <boost\noncopyable.hpp>
#include <boost\thread.hpp>
#include <queue>
namespace GSMV
{
///////////////////////////////////////////////////////////////////////////////////////
/// Class: SynchronizedQueue
///
/// @Brief
/// SynchronizedQueue is a thread safe STL Queue wrapper that waits on Dequeue and
/// notifies a listening thread on Enqueue. It is not copyable.
///////////////////////////////////////////////////////////////////////////////////////
template <typename T>
class SynchronizedQueue : private boost::noncopyable
{
public:
struct Canceled{};
///////////////////////////////////////////////////////////////////////////////////////
/// Function: Constructor
///
/// @Brief
/// Default constructor for the SynchronizedQueue object.
///////////////////////////////////////////////////////////////////////////////////////
SynchronizedQueue(void)
{
// Queue is not canceled to start with
this->mCanceled = false;
// Nobody waiting yet
this->mWaiting = 0;
}
///////////////////////////////////////////////////////////////////////////////////////
/// Function: Enqueue
///
/// @Param const T &item: Item of type T to add to queue.
///
/// @Brief
/// Adds an item of type T to the queue notifying via a condition.
///////////////////////////////////////////////////////////////////////////////////////
void Enqueue(const T &item)
{
bool enqueued = false;
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// make sure the queue is not canceled
if (this->mCanceled)
throw Canceled();
// add item to the queue
this->mQueue.push(item);
// notify others that queue has a new item
this->mItemAvailable.notify_one();
}
///////////////////////////////////////////////////////////////////////////////////////
/// Function: Dequeue
///
/// @Return
/// Item of type T from front of queue.
///
/// @Brief
/// Returns an item of type T from the queue and deletes the front of the queue. Thread
/// will wait on an empty queue until it is signaled via Enqueue.
///////////////////////////////////////////////////////////////////////////////////////
T Dequeue(void)
{
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// make sure the queue is not canceled
if (this->mCanceled)
throw Canceled();
// one more thread is waiting on this item
++this->mWaiting;
// if the queue is empty, wait until an item is added
// lock is released inside the wait
// lock is re-acquired after the wait
while (this->mQueue.empty())
this->mItemAvailable.wait(lock);
// the thread is done waiting now
--this->mWaiting;
// retrieve and remove the item from the queue
T item = this->mQueue.front();
this->mQueue.pop();
return item;
// lock is released
}
///////////////////////////////////////////////////////////////////////////////////////
/// Function: GetSize
///
/// @Return
/// The current size of the queue (number of items in the queue).
///
/// @Brief
/// Returns the number of items contained in the queue.
///////////////////////////////////////////////////////////////////////////////////////
int GetSize(void)
{
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// make sure the queue is not canceled
if (this->mCanceled)
throw Canceled();
return this->mQueue.size();
// lock is released
}
///////////////////////////////////////////////////////////////////////////////////////
/// Function: IsEmpty
///
/// @Return
/// Boolean queue is empty.
///
/// @Brief
/// Returns true if queue is empty false otherwise.
///////////////////////////////////////////////////////////////////////////////////////
bool IsEmpty(void)
{
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// make sure the queue is not canceled
if (this->mCanceled)
throw Canceled();
return this->mQueue.empty();
// lock is released
}
void Cancel(void)
{
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// make sure the queue is not canceled
if (this->mCanceled)
throw Canceled();
this->mCanceled = true;
// notify all others that queue has a new item
this->mItemAvailable.notify_all();
while (0 < this->mWaiting)
this->mItemAvailable.wait(lock);
}
void Reset(void)
{
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// reset the canceled arguement
this->mCanceled = false;
}
private:
bool mCanceled;
int mWaiting;
std::queue<T> mQueue; // the STL Queue
boost::mutex mMutex; // the mutex object
boost::condition_variable mItemAvailable; // the signal condition
};
} // Namespace GSMV
#endif /// _SYNCHRONIZED_QUEUE_
Код регистратора
#ifndef _LOGGER_H_
#define _LOGGER_H_
#include "SynchronizedQueue.h"
#include <string>
#include <boost\thread.hpp>
namespace GSMV
{
static SynchronizedQueue<std::string> logQ;
class Logger
{
public:
Logger(void);
~Logger(void);
bool Start(void);
bool Stop(void);
bool IsRunning(void) const;
void LoggerWorkThread(void);
private:
boost::thread* mpLoggerThread;
};
} // Namespace GSMV
#endif
// FILE END - logger.h //
#include "Logger.h"
using namespace GSMV;
Logger::Logger(void)
{
this->mpLoggerThread = NULL;
}
Logger::~Logger(void)
{
this->Stop();
}
bool Logger::Start(void)
{
bool started = this->IsRunning();
if (!started)
{
this->mpLoggerThread = new boost::thread(&Logger::LoggerWorkThread, this);
started = (NULL != this->mpLoggerThread);
}
return started;
}
bool Logger::Stop(void)
{
bool stopped = !this->IsRunning();
if (!stopped)
{
this->mpLoggerThread->interrupt();
this->mpLoggerThread->join();
delete this->mpLoggerThread;
this->mpLoggerThread = NULL;
stopped = true;
}
return stopped;
}
bool Logger::IsRunning(void) const
{
return (NULL != this->mpLoggerThread);
}
void Logger::LoggerWorkThread(void)
{
std::cout << "Enter Logger Work Thread\n" << std::endl;
while (this->IsRunning())
{
std::cout << "LOG: wait for Q..." << std::endl;
std::string s = logQ.Dequeue();
std::cout << "LOG: Got item! => " << s << std::endl;
boost::this_thread::interruption_point();
}
std::cout << "Exit Logger Work Thread\n" << std::endl;
}
Таким образом, используя приведенный выше код, я бы создал объект logger и вызвал метод Start (). В идеале он запускает новый поток, который зацикливается, проверяя очередь на наличие строковых элементов, пока не будет вызван метод Stop (). Итак, вернувшись в свою основную функцию, я могу поместить строки в очередь, и регистратор должен их получить, но регистратор никогда не получит уведомление. Если это имеет значение, очередь объявляется в заголовочном файле Logger как «static SynchronizedQueue logQ».
Опять же, я был бы признателен за любые предложения здесь. Спасибо!