поток windows c ++, ожидающий очереди данных - PullRequest
6 голосов
/ 09 сентября 2011

Моя программа настроена следующим образом:
Существует потокобезопасный класс очереди, один поток помещает в него данные, сидя в бесконечном цикле, а второй поток выталкивает данные из него, сидя в бесконечном цикле.Я пытаюсь придумать, как использовать события Windows или какой-то другой механизм, чтобы сделать thread_1 (ниже), ждать в бесконечном цикле while и выполнять итерацию, только когда глубина очереди больше или равна 1.

class thread-safe_Queue
{
 public:
  push();
  pop();
};

DWORD thread_1()
{
 while(1)
 {
  // wait for thread-safe queue to have data on it
  // pop data off
  // process data
 }
}

DWORD thread_2()
{
 while(1)
 {
  // when data becomes available, push data onto thread-safe queue
 }
}

Ответы [ 3 ]

2 голосов
/ 14 сентября 2011

Я думаю, что это может сработать.Производный класс Event и перегрузка функции Process ().

#include <process.h> // Along with all the normal windows includes

//*********************************************
using namespace os;

Mutex globalQueueMutex;

class QueueReader : public Event
{
public:
    virtual void Process()
    {
      // Lock the queue
      Locker l(globalQueueMutex);
      // pop data off
      // process data
      return; // queue will automatically unlock
    }
};

QueueReader myQueueReader;

//*********************************************
// The queue writer would have functions like :
void StartQueueReader()
{
    Thread(QueueReader::StartEventHandler, &myQueueReader);
}
void WriteToQueue()
{
    Locker l(globalQueueMutex);
    // write to the queue
    myQueueReader.SignalProcess(); // tell reader to wake up
}
// When want to shutdown
void Shutdown()
{
    myQueueReader.SignalShutdown();
}

Вот классы, которые выполняют магию.

namespace os {

// **********************************************************************
/// Windows implementation to spawn a thread.
static uintptr_t Thread (void (*StartAddress)(void *), void *ArgList)
{
  return _beginthread(StartAddress, 0, ArgList);
}

// **********************************************************************
/// Windows implementation of a critical section.
class Mutex
{
public:
  // Initialize section on construction
  Mutex() { InitializeCriticalSection( &cs_ ); }
  // Delete section on destruction
  ~Mutex() { DeleteCriticalSection( &cs_ ); }
  // Lock it
  void lock() { EnterCriticalSection( &cs_ ); }
  // Unlock it
  void unlock() { LeaveCriticalSection( &cs_ ); }

private:
  CRITICAL_SECTION cs_;
}; // class Mutex

/// Locks/Unlocks a mutex
class Locker
{
public:
  // Lock the mutex on construction
  Locker( Mutex& mutex ): mutex_( mutex ) { mutex_.lock(); }
  // Unlock on destruction
  ~Locker() { mutex_.unlock(); }
private:
  Mutex& mutex_;
}; // class Locker

// **********************************************************************
// Windows implementation of event handler
#define ProcessEvent  hEvents[0]
#define SetTimerEvent hEvents[1]
#define ShutdownEvent hEvents[2]

/// Windows implementation of events
class Event
{
  /// Flag set when shutdown is complete
  bool Shutdown;
  /// Max time to wait for events
  DWORD Timer;
  /// The three events  - process, reset timer, and shutdown
  HANDLE hEvents[3];

public:
  /// Timeout is disabled by default and Events assigned
  Event( DWORD timer = INFINITE) : Timer(timer)
  {
    Shutdown = false;
    ProcessEvent = CreateEvent( NULL,TRUE,FALSE,NULL );
    SetTimerEvent = CreateEvent( NULL,TRUE,FALSE,NULL );
    ShutdownEvent = CreateEvent( NULL,TRUE,FALSE,NULL );
  }

  /// Close the event handles
  virtual ~Event()
  {
    CloseHandle(ProcessEvent);
    CloseHandle(SetTimerEvent);
    CloseHandle(ShutdownEvent);
  }

  /// os::Thread calls this to start the Event handler
  static void StartEventHandler(void *pMyInstance)
    { ((Event *)pMyInstance)->EventHandler(); }
  /// Call here to Change/Reset the timeout timer
  void ResetTimer(DWORD timer)  { Timer = timer; SetEvent(SetTimerEvent); }
  /// Set the signal to shutdown the worker thread processing events
  void SignalShutdown() { SetEvent(ShutdownEvent); while (!Shutdown) Sleep(30);}
  /// Set the signal to run the process
  void SignalProcess() { SetEvent(ProcessEvent); }

protected:
  /// Overload in derived class to process events with worker thread
  virtual void Process(){}
  /// Override to process timeout- return true to terminate thread
  virtual bool Timeout(){ return true;}

  /// Monitor thread events
  void EventHandler()
  {
    DWORD WaitEvents;
    while (!Shutdown)
    {
      // Wait here, looking to be signaled what to do next
      WaitEvents = WaitForMultipleObjects(3, hEvents, FALSE, Timer);

      switch (WaitEvents)
      {
        // Process event - process event then reset for the next one
        case WAIT_OBJECT_0 + 0:
          Process();
          ResetEvent(ProcessEvent);
          break;

        // Change timer event - see ResetTimer(DWORD timer)
        case WAIT_OBJECT_0 + 1:
          ResetEvent(SetTimerEvent);
          continue;

        // Shutdown requested so exit this thread
        case WAIT_OBJECT_0 + 2:
          Shutdown = true;
          break;

        // Timed out waiting for an event
        case WAIT_TIMEOUT:
          Shutdown = Timeout();
          break;

        // Failed - should never happen
        case WAIT_FAILED:
          break;

        default:
          break;
      }
    }
  }


};

} // namespace os
0 голосов
/ 09 сентября 2011

Вы можете использовать именованные события.Каждый поток будет вызывать CreateEvent, передавая то же имя.Затем используйте WaitForMultipleObjects для ожидания события, связанного с очередью, или события завершения программы.Всплывающий поток будет ожидать событий queue_has_data и end_program.Поток push-уведомлений будет ожидать событий data_available и end_program и устанавливать событие queue_has_data, когда он помещает что-либо в очередь.

0 голосов
/ 09 сентября 2011

Как насчет этого (я полагаю, вы знакомы с механизмом событий).

1.

thread_safe_Queue::push(something)
{
// lock the queue
...
// push object
// Signal the event
SetEvent(notification);

// unlock the queue
}

2.

thread_safe_Queue::pop(something)
{
WaitForSingleObject(notification);
// lock the queue
...
// get object
// reset the event
if (queue is empty)
  ResetEvent(notification);

// unlock the queue
}

3.thread_1 просто пытается вытолкнуть объект и обработать его.Когда что-то выдвигается, событие включено, поэтому pop может быть успешно вызвано.В противном случае он будет ждать внутри pop.На самом деле вы можете использовать другие синхронизирующие объекты, такие как мьютексы или критические секции, вместо событий в этом случае.

ОБНОВЛЕНИЕ.Внешние события: поток 1:

void thread_1()
  {
  while(1)
    {
    WaitForSingleObject(notification);
    if (!pop(object))  // pop should return if there are any objects left in queue
      SetEvent(notification);    
    }
  }

поток_2

void thread_2()
  {
  while(1)
    {
    // push the object and than signal event
    ResetEvent(notification)
    }
  }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...