Как сделать так, чтобы потоки Win32 / MFC зацикливались? - PullRequest
5 голосов
/ 09 февраля 2011

Я новичок в многопоточности в Windows, так что это может быть тривиальный вопрос: какой самый простой способ убедиться, что потоки выполняют цикл в lockstep?

Я попытался передать общий массив Event s для всех потоков и использование WaitForMultipleObjects в конце цикла для их синхронизации, но это дает мне тупик после одного, иногда двух циклов.Вот упрощенная версия моего текущего кода (всего с двумя потоками, но я бы хотел сделать его масштабируемым):

typedef struct
{
    int rank;
    HANDLE* step_events;
} IterationParams;

int main(int argc, char **argv)
{
    // ...

    IterationParams p[2];
    HANDLE step_events[2];
    for (int j=0; j<2; ++j)
    {
        step_events[j] = CreateEvent(NULL, FALSE, FALSE, NULL);
    }

    for (int j=0; j<2; ++j)
    {
        p[j].rank = j;
        p[j].step_events = step_events;
        AfxBeginThread(Iteration, p+j);
    }

    // ...
}

UINT Iteration(LPVOID pParam)
{
    IterationParams* p = (IterationParams*)pParam;
    int rank = p->rank;

    for (int i=0; i<100; i++)
    {
        if (rank == 0)
        {
            printf("%dth iteration\n",i);
            // do something
            SetEvent(p->step_events[0]);
            WaitForMultipleObjects(2, p->step_events, TRUE, INFINITE);
        }
        else if (rank == 1)
        {
            // do something else
            SetEvent(p->step_events[1]);
            WaitForMultipleObjects(2, p->step_events, TRUE, INFINITE);
        }
    }
    return 0;
}

(я знаю, что я смешиваю C и C ++, на самом деле это устаревший код C)что я пытаюсь распараллелить.)

Читая документы в MSDN, я думаю, это должно работать.Тем не менее, поток 0 печатается только один раз, иногда дважды, и затем программа зависает.Это правильный способ синхронизации потоков?Если нет, что бы вы порекомендовали (разве в MFC нет встроенной поддержки для барьера?).


EDIT : это решение НЕПРАВИЛЬНО даже включая исправление Алессандро .Например, рассмотрим следующий сценарий:

  1. Поток 0 устанавливает свое событие и вызывает Wait, блоки
  2. Поток 1 устанавливает свое событие и вызывает Wait, blocks
  3. Поток 0возвращается из Wait, сбрасывает свое событие и завершает цикл без получения потоком 1 элемента управления
  4. Thread 0 устанавливает свое собственное событие и вызывает Wait.Поскольку у потока 1 еще не было возможности сбросить событие, ожидание потока 0 немедленно возвращается, и потоки перестают синхронизироваться.

Таким образом, остается вопрос: как обеспечить безопасность что нити остаются на месте?

Ответы [ 3 ]

4 голосов
/ 09 февраля 2011

Введение

Я реализовал простую программу на C ++ для вашего рассмотрения (протестировано в Visual Studio 2010). Он использует только Win32 API (и стандартную библиотеку для вывода на консоль и немного рандомизации). Вы должны иметь возможность добавить его в новый консольный проект Win32 (без предварительно скомпилированных заголовков), скомпилировать и запустить.


Решение

#include <tchar.h>
#include <windows.h>


//---------------------------------------------------------
// Defines synchronization info structure. All threads will
// use the same instance of this struct to implement randezvous/
// barrier synchronization pattern.
struct SyncInfo
{
    SyncInfo(int threadsCount) : Awaiting(threadsCount), ThreadsCount(threadsCount), Semaphore(::CreateSemaphore(0, 0, 1024, 0)) {};
    ~SyncInfo() { ::CloseHandle(this->Semaphore); }
    volatile unsigned int Awaiting; // how many threads still have to complete their iteration
    const int ThreadsCount;
    const HANDLE Semaphore;
};


//---------------------------------------------------------
// Thread-specific parameters. Note that Sync is a reference
// (i.e. all threads share the same SyncInfo instance).
struct ThreadParams
{
    ThreadParams(SyncInfo &sync, int ordinal, int delay) : Sync(sync), Ordinal(ordinal), Delay(delay) {};
    SyncInfo &Sync;
    const int Ordinal;
    const int Delay;
};


//---------------------------------------------------------
// Called at the end of each itaration, it will "randezvous"
// (meet) all the threads before returning (so that next
// iteration can begin). In practical terms this function
// will block until all the other threads finish their iteration.
static void RandezvousOthers(SyncInfo &sync, int ordinal)
{
    if (0 == ::InterlockedDecrement(&(sync.Awaiting))) { // are we the last ones to arrive?
        // at this point, all the other threads are blocking on the semaphore
        // so we can manipulate shared structures without having to worry
        // about conflicts
        sync.Awaiting = sync.ThreadsCount;
        wprintf(L"Thread %d is the last to arrive, releasing synchronization barrier\n", ordinal);
        wprintf(L"---~~~---\n");

        // let's release the other threads from their slumber
        // by using the semaphore
        ::ReleaseSemaphore(sync.Semaphore, sync.ThreadsCount - 1, 0); // "ThreadsCount - 1" because this last thread will not block on semaphore
    }
    else { // nope, there are other threads still working on the iteration so let's wait
        wprintf(L"Thread %d is waiting on synchronization barrier\n", ordinal);
        ::WaitForSingleObject(sync.Semaphore, INFINITE); // note that return value should be validated at this point ;)
    }
}


//---------------------------------------------------------
// Define worker thread lifetime. It starts with retrieving
// thread-specific parameters, then loops through 5 iterations
// (randezvous-ing with other threads at the end of each),
// and then finishes (the thread can then be joined).
static DWORD WINAPI ThreadProc(void *p)
{
    ThreadParams *params = static_cast<ThreadParams *>(p);
    wprintf(L"Starting thread %d\n", params->Ordinal);

    for (int i = 1; i <= 5; ++i) {
        wprintf(L"Thread %d is executing iteration #%d (%d delay)\n", params->Ordinal, i, params->Delay);
        ::Sleep(params->Delay); 
        wprintf(L"Thread %d is synchronizing end of iteration #%d\n", params->Ordinal, i);
        RandezvousOthers(params->Sync, params->Ordinal);
    }

    wprintf(L"Finishing thread %d\n", params->Ordinal);
    return 0;
}


//---------------------------------------------------------
// Program to illustrate iteration-lockstep C++ solution.
int _tmain(int argc, _TCHAR* argv[])
{
    // prepare to run
    ::srand(::GetTickCount()); // pseudo-randomize random values :-)
    SyncInfo sync(4);
    ThreadParams p[] = {
        ThreadParams(sync, 1, ::rand() * 900 / RAND_MAX + 100), // a delay between 200 and 1000 milliseconds will simulate work that an iteration would do
        ThreadParams(sync, 2, ::rand() * 900 / RAND_MAX + 100),
        ThreadParams(sync, 3, ::rand() * 900 / RAND_MAX + 100),
        ThreadParams(sync, 4, ::rand() * 900 / RAND_MAX + 100),
    };

    // let the threads rip
    HANDLE t[] = {
        ::CreateThread(0, 0, ThreadProc, p + 0, 0, 0),
        ::CreateThread(0, 0, ThreadProc, p + 1, 0, 0),
        ::CreateThread(0, 0, ThreadProc, p + 2, 0, 0),
        ::CreateThread(0, 0, ThreadProc, p + 3, 0, 0),
    };

    // wait for the threads to finish (join)
    ::WaitForMultipleObjects(4, t, true, INFINITE);

    return 0;
}

Пример вывода

Запуск этой программы на моей машине (двухъядерный) дает следующий вывод:

Starting thread 1
Starting thread 2
Starting thread 4
Thread 1 is executing iteration #1 (712 delay)
Starting thread 3
Thread 2 is executing iteration #1 (798 delay)
Thread 4 is executing iteration #1 (477 delay)
Thread 3 is executing iteration #1 (104 delay)
Thread 3 is synchronizing end of iteration #1
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #1
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #1
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #1
Thread 2 is the last to arrive, releasing synchronization barrier
---~~~---
Thread 2 is executing iteration #2 (798 delay)
Thread 3 is executing iteration #2 (104 delay)
Thread 1 is executing iteration #2 (712 delay)
Thread 4 is executing iteration #2 (477 delay)
Thread 3 is synchronizing end of iteration #2
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #2
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #2
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #2
Thread 2 is the last to arrive, releasing synchronization barrier
---~~~---
Thread 4 is executing iteration #3 (477 delay)
Thread 3 is executing iteration #3 (104 delay)
Thread 1 is executing iteration #3 (712 delay)
Thread 2 is executing iteration #3 (798 delay)
Thread 3 is synchronizing end of iteration #3
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #3
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #3
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #3
Thread 2 is the last to arrive, releasing synchronization barrier
---~~~---
Thread 2 is executing iteration #4 (798 delay)
Thread 3 is executing iteration #4 (104 delay)
Thread 1 is executing iteration #4 (712 delay)
Thread 4 is executing iteration #4 (477 delay)
Thread 3 is synchronizing end of iteration #4
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #4
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #4
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #4
Thread 2 is the last to arrive, releasing synchronization barrier
---~~~---
Thread 3 is executing iteration #5 (104 delay)
Thread 4 is executing iteration #5 (477 delay)
Thread 1 is executing iteration #5 (712 delay)
Thread 2 is executing iteration #5 (798 delay)
Thread 3 is synchronizing end of iteration #5
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #5
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #5
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #5
Thread 2 is the last to arrive, releasing synchronization barrier
---~~~---
Finishing thread 4
Finishing thread 3
Finishing thread 2
Finishing thread 1

Обратите внимание, что для простоты каждый поток имеет случайную длительность итерации, но все итерации этого потока будут использовать одну и ту же случайную продолжительность (т. Е. Она не меняется между итерациями).


Как это работает?

«Ядром» решения является функция «RandezvousOthers». Эта функция либо блокирует общий семафор (если поток, в котором была вызвана эта функция, была не последним, вызвавшим эту функцию), либо сбрасывает структуру Sync и разблокирует все потоки, блокирующие общий семафор (если поток, для которого была вызвана функция, которая вызывала функцию последней).

2 голосов
/ 09 февраля 2011

Чтобы все заработало, установите второй параметр CreateEvent на TRUE. Это сделает события «ручным сбросом» и не позволит Waitxxx сбросить его. Затем поместите ResetEvent в начале цикла.

1 голос
/ 09 февраля 2011

Я нашел это SyncTools (скачать SyncTools.zip), прибегая к помощи "окна синхронизации барьера".Он использует один CriticalSection и одно событие для реализации барьера для N потоков.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...