Я новичок в многопоточности в Windows, так что это может быть тривиальный вопрос: какой самый простой способ убедиться, что потоки выполняют цикл в lockstep?

Я попытался передать общий массив Event s для всех потоков и использование WaitForMultipleObjects в конце цикла для их синхронизации, но это дает мне тупик после одного, иногда двух циклов.Вот упрощенная версия моего текущего кода (всего с двумя потоками, но я бы хотел сделать его масштабируемым):

typedef struct
    int rank;
    HANDLE* step_events;
} IterationParams;

int main(int argc, char **argv)
    // ...

    IterationParams p[2];
    HANDLE step_events[2];
    for (int j=0; j<2; ++j)
        step_events[j] = CreateEvent(NULL, FALSE, FALSE, NULL);

    for (int j=0; j<2; ++j)
        p[j].rank = j;
        p[j].step_events = step_events;
        AfxBeginThread(Iteration, p+j);

    // ...

UINT Iteration(LPVOID pParam)
    IterationParams* p = (IterationParams*)pParam;
    int rank = p->rank;

    for (int i=0; i<100; i++)
        if (rank == 0)
            printf("%dth iteration\n",i);
            // do something
            WaitForMultipleObjects(2, p->step_events, TRUE, INFINITE);
        else if (rank == 1)
            // do something else
            WaitForMultipleObjects(2, p->step_events, TRUE, INFINITE);
    return 0;

(я знаю, что я смешиваю C и C ++, на самом деле это устаревший код C)что я пытаюсь распараллелить.)

Читая документы в MSDN, я думаю, это должно работать.Тем не менее, поток 0 печатается только один раз, иногда дважды, и затем программа зависает.Это правильный способ синхронизации потоков?Если нет, что бы вы порекомендовали (разве в MFC нет встроенной поддержки для барьера?).

EDIT : это решение НЕПРАВИЛЬНО даже включая исправление Алессандро .Например, рассмотрим следующий сценарий:

  1. Поток 0 устанавливает свое событие и вызывает Wait, блоки
  2. Поток 1 устанавливает свое событие и вызывает Wait, blocks
  3. Поток 0возвращается из Wait, сбрасывает свое событие и завершает цикл без получения потоком 1 элемента управления
  4. Thread 0 устанавливает свое собственное событие и вызывает Wait.Поскольку у потока 1 еще не было возможности сбросить событие, ожидание потока 0 немедленно возвращается, и потоки перестают синхронизироваться.

Таким образом, остается вопрос: как обеспечить безопасность что нити остаются на месте?

Ответы

Я реализовал простую программу на C ++ для вашего рассмотрения (протестировано в Visual Studio 2010). Он использует только Win32 API (и стандартную библиотеку для вывода на консоль и немного рандомизации). Вы должны иметь возможность добавить его в новый консольный проект Win32 (без предварительно скомпилированных заголовков), скомпилировать и запустить.


#include <tchar.h>
#include <windows.h>

// Defines synchronization info structure. All threads will
// use the same instance of this struct to implement randezvous/
// barrier synchronization pattern.
struct SyncInfo
    SyncInfo(int threadsCount) : Awaiting(threadsCount), ThreadsCount(threadsCount), Semaphore(::CreateSemaphore(0, 0, 1024, 0)) {};
    ~SyncInfo() { ::CloseHandle(this->Semaphore); }
    volatile unsigned int Awaiting; // how many threads still have to complete their iteration
    const int ThreadsCount;
    const HANDLE Semaphore;

// Thread-specific parameters. Note that Sync is a reference
// (i.e. all threads share the same SyncInfo instance).
struct ThreadParams
    ThreadParams(SyncInfo &sync, int ordinal, int delay) : Sync(sync), Ordinal(ordinal), Delay(delay) {};
    SyncInfo &Sync;
    const int Ordinal;
    const int Delay;

// Called at the end of each itaration, it will "randezvous"
// (meet) all the threads before returning (so that next
// iteration can begin). In practical terms this function
// will block until all the other threads finish their iteration.
static void RandezvousOthers(SyncInfo &sync, int ordinal)
    if (0 == ::InterlockedDecrement(&(sync.Awaiting))) { // are we the last ones to arrive?
        // at this point, all the other threads are blocking on the semaphore
        // so we can manipulate shared structures without having to worry
        // about conflicts
        sync.Awaiting = sync.ThreadsCount;
        wprintf(L"Thread %d is the last to arrive, releasing synchronization barrier\n", ordinal);

        // let's release the other threads from their slumber
        // by using the semaphore
        ::ReleaseSemaphore(sync.Semaphore, sync.ThreadsCount - 1, 0); // "ThreadsCount - 1" because this last thread will not block on semaphore
    else { // nope, there are other threads still working on the iteration so let's wait
        wprintf(L"Thread %d is waiting on synchronization barrier\n", ordinal);
        ::WaitForSingleObject(sync.Semaphore, INFINITE); // note that return value should be validated at this point ;)

// Define worker thread lifetime. It starts with retrieving
// thread-specific parameters, then loops through 5 iterations
// (randezvous-ing with other threads at the end of each),
// and then finishes (the thread can then be joined).
static DWORD WINAPI ThreadProc(void *p)
    ThreadParams *params = static_cast<ThreadParams *>(p);
    wprintf(L"Starting thread %d\n", params->Ordinal);

    for (int i = 1; i <= 5; ++i) {
        wprintf(L"Thread %d is executing iteration #%d (%d delay)\n", params->Ordinal, i, params->Delay);
        wprintf(L"Thread %d is synchronizing end of iteration #%d\n", params->Ordinal, i);
        RandezvousOthers(params->Sync, params->Ordinal);

    wprintf(L"Finishing thread %d\n", params->Ordinal);
    return 0;

// Program to illustrate iteration-lockstep C++ solution.
int _tmain(int argc, _TCHAR* argv[])
    // prepare to run
    ::srand(::GetTickCount()); // pseudo-randomize random values :-)
    SyncInfo sync(4);
    ThreadParams p[] = {
        ThreadParams(sync, 1, ::rand() * 900 / RAND_MAX + 100), // a delay between 200 and 1000 milliseconds will simulate work that an iteration would do
        ThreadParams(sync, 2, ::rand() * 900 / RAND_MAX + 100),
        ThreadParams(sync, 3, ::rand() * 900 / RAND_MAX + 100),
        ThreadParams(sync, 4, ::rand() * 900 / RAND_MAX + 100),

    // let the threads rip
    HANDLE t[] = {
        ::CreateThread(0, 0, ThreadProc, p + 0, 0, 0),
        ::CreateThread(0, 0, ThreadProc, p + 1, 0, 0),
        ::CreateThread(0, 0, ThreadProc, p + 2, 0, 0),
        ::CreateThread(0, 0, ThreadProc, p + 3, 0, 0),

    // wait for the threads to finish (join)
    ::WaitForMultipleObjects(4, t, true, INFINITE);

    return 0;

Пример вывода

Запуск этой программы на моей машине (двухъядерный) дает следующий вывод:

Starting thread 1
Starting thread 2
Starting thread 4
Thread 1 is executing iteration #1 (712 delay)
Starting thread 3
Thread 2 is executing iteration #1 (798 delay)
Thread 4 is executing iteration #1 (477 delay)
Thread 3 is executing iteration #1 (104 delay)
Thread 3 is synchronizing end of iteration #1
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #1
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #1
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #1
Thread 2 is the last to arrive, releasing synchronization barrier
Thread 2 is executing iteration #2 (798 delay)
Thread 3 is executing iteration #2 (104 delay)
Thread 1 is executing iteration #2 (712 delay)
Thread 4 is executing iteration #2 (477 delay)
Thread 3 is synchronizing end of iteration #2
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #2
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #2
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #2
Thread 2 is the last to arrive, releasing synchronization barrier
Thread 4 is executing iteration #3 (477 delay)
Thread 3 is executing iteration #3 (104 delay)
Thread 1 is executing iteration #3 (712 delay)
Thread 2 is executing iteration #3 (798 delay)
Thread 3 is synchronizing end of iteration #3
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #3
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #3
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #3
Thread 2 is the last to arrive, releasing synchronization barrier
Thread 2 is executing iteration #4 (798 delay)
Thread 3 is executing iteration #4 (104 delay)
Thread 1 is executing iteration #4 (712 delay)
Thread 4 is executing iteration #4 (477 delay)
Thread 3 is synchronizing end of iteration #4
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #4
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #4
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #4
Thread 2 is the last to arrive, releasing synchronization barrier
Thread 3 is executing iteration #5 (104 delay)
Thread 4 is executing iteration #5 (477 delay)
Thread 1 is executing iteration #5 (712 delay)
Thread 2 is executing iteration #5 (798 delay)
Thread 3 is synchronizing end of iteration #5
Thread 3 is waiting on synchronization barrier
Thread 4 is synchronizing end of iteration #5
Thread 4 is waiting on synchronization barrier
Thread 1 is synchronizing end of iteration #5
Thread 1 is waiting on synchronization barrier
Thread 2 is synchronizing end of iteration #5
Thread 2 is the last to arrive, releasing synchronization barrier
Finishing thread 4
Finishing thread 3
Finishing thread 2
Finishing thread 1

Обратите внимание, что для простоты каждый поток имеет случайную длительность итерации, но все итерации этого потока будут использовать одну и ту же случайную продолжительность (т. Е. Она не меняется между итерациями).

Как это работает?

«Ядром» решения является функция «RandezvousOthers». Эта функция либо блокирует общий семафор (если поток, в котором была вызвана эта функция, была не последним, вызвавшим эту функцию), либо сбрасывает структуру Sync и разблокирует все потоки, блокирующие общий семафор (если поток, для которого была вызвана функция, которая вызывала функцию последней).

Чтобы все заработало, установите второй параметр CreateEvent на TRUE. Это сделает события «ручным сбросом» и не позволит Waitxxx сбросить его. Затем поместите ResetEvent в начале цикла.

Я нашел это SyncTools (скачать SyncTools.zip), прибегая к помощи "окна синхронизации барьера".Он использует один CriticalSection и одно событие для реализации барьера для N потоков.

