Безопасная синхронизация потока COM - PullRequest
3 голосов
/ 19 апреля 2011

В прошлом я много работал с многопоточностью, но я довольно новичок в COM.В любом случае, вот моя проблема:

Я создаю рабочий поток, который регистрируется как STA и создает объект COM.Затем рабочий поток и основной поток пытаются связаться друг с другом.Используя CoMarshalInterThreadInterfaceInStream и CoGetInterfaceAndReleaseStream, я могу заставить потоки вызывать методы COM-объектов в другом потоке.

Вот как выглядит рабочий поток:

void workerThread()
{
  CoInitialize(NULL);
  MyLib::IFooPtr foo = ...; // create my COM object

  // Marshall it so the main thread can talk to it
  HRESULT hr = CoMarshalInterThreadInterfaceInStream(foo.GetIID(),
                                                     foo.GetInterfacePtr(),
                                                     &m_stream);
  if (FAILED(hr)) {
    // handle failure
  }

  // begin message loop, to keep this STA alive
  MSG msg;
  BOOL bRet;
  while( (bRet = GetMessage( &msg, NULL, 0, 0 )) != 0)
  { 
    if (bRet == -1)  break;

    DispatchMessage(&msg); 
  }
}

Inосновной поток:

// launch the thread
m_worker = boost::thread (&workerThread);

// get the interface proxy
MyLib::IFooPtr foo;
LPVOID vp (NULL);
HRESULT hr = CoGetInterfaceAndReleaseStream(m_stream, foo.GetIID(), &vp);
if (SUCCEEDED(hr)) foo.Attach(static_cast<MyLib::IFoo*>(vp));

Это создает объект (который требует времени для инициализации) и позволяет основному потоку общаться с ним, и все правильно синхронизируется с компонентом COM Apartment.Насколько я могу судить по чтению msdn, похоже, что это правильный путь.Теперь основной поток может использовать свой прокси для вызова методов моего COM-объекта, и рабочий поток будет получать эти вызовы через очередь сообщений, надлежащим образом распределяя их.

Однако, как насчет синхронизации этих потоков?

Очевидно, что в этом случае я хочу, чтобы основной поток ожидал вызова CoGetInterfaceAndReleaseStream до тех пор, пока рабочий поток не создаст этот поток с помощью CoMarshalInterThreadInterfaceInStream.Но как я могу безопасно это сделать?

Начиная с MSDN , я должен использовать что-то вроде MsgWaitForMultipleObjects, поэтому я могу ждать my_condition ИЛИ new_message_arrived, а затем я могу сделать что-то вроде:

// verbatim from msdn
while (TRUE)
{
   // wait for the event and for messages
   DWORD dwReturn = ::MsgWaitForMultipleObjects(1,
                     &m_hDoneLoading, FALSE, INFINITE, QS_ALLINPUT);

   // this thread has been reawakened. Determine why
   // and handle appropriately.
   if (dwReturn == WAIT_OBJECT_0)
     // our event happened.
     break ;
   else if (dwReturn == WAIT_OBJECT_0 + 1)
   {
     // handle windows messages to maintain
     // client liveness
     MSG msg ;
     while(::PeekMessage(&msg, NULL, 0, 0, PM_REMOVE))
       ::DispatchMessage(&msg) ;
   }
}

Но как мне смешать boost::thread.join() и boost::condition.wait() с MsgWaitForMultipleObjects?Это вообще возможно, или я должен сделать что-то еще, чтобы избежать состояния гонки?

1 Ответ

3 голосов
/ 19 апреля 2011

В вашем основном потоке есть очередь сообщений (должно быть, так как она является хостом STA), почему бы просто не опубликовать на ней сообщение, PostThreadMessage? Отправьте пользовательское сообщение (WM_USER + X), и ваш обычный насос сообщений основного потока может обработать это пользовательское сообщение как уведомление о том, что COM-объект упорядочил интерфейс в потоке, и основной поток безопасно вызвать CoGetInterfaceAndReleaseStream.

Я должен сказать, что с вашим текущим дизайном ваш рабочий поток, по сути, не более чем просто запускает дополнительный насос сообщений. Любой вызов любого метода на вашем интерфейсе из основного потока блокирует, ждет, пока рабочий поток заберет сообщение из своей очереди сообщений, обработает вызов, ответит, и затем основной поток возобновит работу. Все операции будут по крайней мере такими же медленными, как размещение COM-объекта в главном потоке, плюс издержки на COM-маршалинг между двумя STA. По сути, нет никакого параллелизма между двумя потоками из-за того, как работает COM STA. Вы уверены, что это то, что вы хотите?

Редактировать

(без учета множества деталей, таких как количество потоков, обработка тайм-аута, назначение потока / IID / CLSID для каждого работника и т. Д. И т. Д.)

в .ч:

HANDLE m_startupDone;
volatile int m_threadStartCount;

рабочий поток:

void workerThread()
{
  CoInitialize(NULL);
  MyLib::IFooPtr foo = ...; // create my COM object

  // Marshall it so the main thread can talk to it
  HRESULT hr = CoMarshalInterThreadInterfaceInStream(foo.GetIID(),
                                                     foo.GetInterfacePtr(),
                                                     &m_stream);
  if (FAILED(hr)) {
    // handle failure
    // remember to decrement and signal *even on failure*
  }

  if (0 == InterlockedDecrement(&m_threadStartCount))
  {
     SetEvent (m_startupDone);
  } 

  // begin message loop, to keep this STA alive
  MSG msg;
  BOOL bRet;
  while( (bRet = GetMessage( &msg, NULL, 0, 0 )) != 0)
  { 
    if (bRet == -1)  break;

    DispatchMessage(&msg); 
  }
}

в основной теме:

m_startupDone = CreateEvent (NULL, FALSE, FALSE, NULL);
m_threadStartCount = <number of workerthreads>

// launch the thread(s)
m_worker = boost::thread (&workerThread);
m_worker2 = boost::thread (&workerThread);
...

// now wait for tall the threads to create the COM object(s)
if (WAIT_OBJECT0 != WaitForSingleObject(m_startupDone, ...))
{
   // handle failure like timeout
}
// By now all COM objects are guaranteed created and marshaled, unmarshall them all in main
// here must check if all threads actually succeeded (could be as simple as m_stream is not NULL)

// get the interface proxy
MyLib::IFooPtr foo;
LPVOID vp (NULL);
HRESULT hr = CoGetInterfaceAndReleaseStream(m_stream, foo.GetIID(), &vp);
if (SUCCEEDED(hr)) foo.Attach(static_cast<MyLib::IFoo*>(vp));
...