ReleaseSemaphore не выпускает семафор - PullRequest
2 голосов
/ 04 марта 2010

(Короче говоря: WaitForSingleObject main () зависает в программе ниже).

Я пытаюсь написать кусок кода, который отправляет потоки и ожидает их завершения, прежде чем возобновится. Вместо того, чтобы создавать потоки каждый раз, что дорого, я усыпляю их. Основной поток создает потоки X в состоянии CREATE_SUSPENDED.

Синхронизация выполняется семафором с X в качестве MaximumCount. Счетчик семафора обнуляется, и потоки отправляются. Треды выполняют какой-то глупый цикл и вызывают ReleaseSemaphore перед тем, как идти спать. Затем основной поток использует X раз WaitForSingleObject, чтобы убедиться, что каждый поток завершил свою работу и находится в спящем режиме. Затем он зацикливается и делает все снова.

Время от времени программа не выходит. Когда я запускаю программу, я вижу, что WaitForSingleObject зависает. Это означает, что ReleaseSemaphore потока не работал. Ничто не напечатано, так что, предположительно, ничего не пошло не так.

Возможно, два потока не должны вызывать ReleaseSemaphore в одно и то же время, но это сведет на нет назначение семафоров ...

Я просто не впадаю в это ...

Другие решения для синхронизации потоков с благодарностью приняты!

#define TRY  100
#define LOOP 100

HANDLE *ids;
HANDLE semaphore;

DWORD WINAPI Count(__in LPVOID lpParameter)
{ 
 float x = 1.0f;   
 while(1)
 { 
  for (int i=1 ; i<LOOP ; i++)
   x = sqrt((float)i*x);
  while (ReleaseSemaphore(semaphore,1,NULL) == FALSE)
   printf(" ReleaseSemaphore error : %d ", GetLastError());
  SuspendThread(ids[(int) lpParameter]);
 }
 return (DWORD)(int)x;
}

int main()
{
 SYSTEM_INFO sysinfo;
 GetSystemInfo( &sysinfo );
 int numCPU = sysinfo.dwNumberOfProcessors;

 semaphore = CreateSemaphore(NULL, numCPU, numCPU, NULL);
 ids = new HANDLE[numCPU];

 for (int j=0 ; j<numCPU ; j++)
  ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, CREATE_SUSPENDED, NULL);

 for (int j=0 ; j<TRY ; j++)
 {
  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }
  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);
  ReleaseSemaphore(semaphore,numCPU,NULL);
 }
 CloseHandle(semaphore);
 printf("Done\n");
 getc(stdin);
}

Ответы [ 5 ]

4 голосов
/ 04 марта 2010

Вместо того, чтобы использовать семафор (по крайней мере напрямую) или явно вызывать main, чтобы выполнить работу, я всегда использовал потокобезопасную очередь. Когда main хочет, чтобы рабочий поток что-то сделал, он помещает описание выполняемой работы в очередь. Каждый из рабочих потоков просто выполняет задание, затем пытается извлечь другое задание из очереди и в итоге приостанавливается до тех пор, пока в очереди не будет задания для него:

Код для очереди выглядит так:

#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED

#include <windows.h>

template<class T, unsigned max = 256>
class queue { 
    HANDLE space_avail; // at least one slot empty
    HANDLE data_avail;  // at least one slot full
    CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos

    T buffer[max];
    long in_pos, out_pos;
public:
    queue() : in_pos(0), out_pos(0) { 
        space_avail = CreateSemaphore(NULL, max, max, NULL);
        data_avail = CreateSemaphore(NULL, 0, max, NULL);
        InitializeCriticalSection(&mutex);
    }

    void push(T data) { 
        WaitForSingleObject(space_avail, INFINITE);       
        EnterCriticalSection(&mutex);
        buffer[in_pos] = data;
        in_pos = (in_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(data_avail, 1, NULL);
    }

    T pop() { 
        WaitForSingleObject(data_avail,INFINITE);
        EnterCriticalSection(&mutex);
        T retval = buffer[out_pos];
        out_pos = (out_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(space_avail, 1, NULL);
        return retval;
    }

    ~queue() { 
        DeleteCriticalSection(&mutex);
        CloseHandle(data_avail);
        CloseHandle(space_avail);
    }
};

#endif

И грубый эквивалент вашего кода в потоках для его использования выглядит примерно так. Я не разобрался в точности, что делала ваша функция потока, но это было что-то с суммированием квадратных корней, и, очевидно, вас больше интересует синхронизация потока, чем то, что фактически делают потоки.

Редактировать: (на основании комментария): Если вам нужно main() дождаться завершения некоторых задач, сделать больше работы, а затем назначить больше задач, как правило, лучше всего справиться с этим, поместив событие (например) в каждую задачу, и ваша функция потока задает события , Пересмотренный код для этого будет выглядеть следующим образом (обратите внимание, что код очереди не затронут):

#include "queue.hpp"

#include <iostream>
#include <process.h>
#include <math.h>
#include <vector>

struct task { 
    int val;
    HANDLE e;

    task() : e(CreateEvent(NULL, 0, 0, NULL)) { }
    task(int i) : val(i), e(CreateEvent(NULL, 0, 0, NULL)) {}
};

void process(void *p) { 
    queue<task> &q = *static_cast<queue<task> *>(p);

    task t;
    while ( -1 != (t=q.pop()).val) {
        std::cout << t.val << "\n";
        SetEvent(t.e);
    }
}

int main() { 
    queue<task> jobs;

    enum { thread_count = 4 };
    enum { task_count = 10 };

    std::vector<HANDLE> threads;
    std::vector<HANDLE> events;

    std::cout << "Creating thread pool" << std::endl;
    for (int t=0; t<thread_count; ++t)
        threads.push_back((HANDLE)_beginthread(process, 0, &jobs));
    std::cout << "Thread pool Waiting" << std::endl;

    std::cout << "First round of tasks" << std::endl;

    for (int i=0; i<task_count; ++i) {
        task t(i+1);
        events.push_back(t.e);
        jobs.push(t);
    }

    WaitForMultipleObjects(events.size(), &events[0], TRUE, INFINITE);

    events.clear();

    std::cout << "Second round of tasks" << std::endl;

    for (int i=0; i<task_count; ++i) {
        task t(i+20);
        events.push_back(t.e);
        jobs.push(t);
    }

    WaitForMultipleObjects(events.size(), &events[0], true, INFINITE);
    events.clear();

    for (int j=0; j<thread_count; ++j)
        jobs.push(-1);

    WaitForMultipleObjects(threads.size(), &threads[0], TRUE, INFINITE);

    return 0;
}
3 голосов
/ 04 марта 2010

проблема возникает в следующем случае:

основной поток возобновляет рабочие потоки:

  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }

рабочие потоки выполняют свою работу и выпускают семафор:

  for (int i=1 ; i<LOOP ; i++)
   x = sqrt((float)i*x);
  while (ReleaseSemaphore(semaphore,1,NULL) == FALSE)

основной поток ждет всех рабочих потоков и сбрасывает семафор:

  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);
  ReleaseSemaphore(semaphore,numCPU,NULL);

основной поток переходит в следующий раунд, пытаясь возобновить рабочие потоки (обратите внимание, что рабочие потоки еще не приостановили событие! Вот где проблема начинается ... вы пытаетесь возобновить потоки, которые не т обязательно приостановлено пока):

  for (int i=0 ; i<numCPU ; i++)
  {
   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)
    printf("Timed out !!!\n");
   ResumeThread(ids[i]);  
  }

наконец рабочие потоки приостанавливаются (хотя они уже должны начинать следующий раунд):

  SuspendThread(ids[(int) lpParameter]);

и главный поток ждет вечно, так как все рабочие теперь приостановлены:

  for (int i=0 ; i<numCPU ; i++)
   WaitForSingleObject(semaphore,INFINITE);

вот ссылка, которая показывает, как правильно решать проблемы производителя / потребителя:

http://en.wikipedia.org/wiki/Producer-consumer_problem

также я думаю, критические секции намного быстрее, чем семафоры и мьютексы. их также легче понять в большинстве случаев (imo).

3 голосов
/ 04 марта 2010

Я не понимаю код, но синхронизация потоков определенно плохая. Вы предполагаете, что потоки будут вызывать SuspendThread () в определенном порядке. Успешный вызов WaitForSingleObject () не сообщает вам , какой поток вызвал ReleaseSemaphore (). Таким образом, вы вызовете ReleaseThread () для потока, который не был приостановлен. Это быстро блокирует программу.

Другое неверное предположение состоит в том, что поток уже называется SuspendThread после возврата WFSO. Обычно да, не всегда. Поток может быть прерван сразу после вызова RS. Вы снова вызовете ReleaseThread () для потока, который не был приостановлен. Обычно это занимает один день или около того, чтобы заблокировать вашу программу.

И я думаю, что одного вызова ReleaseSemaphore слишком много. Без сомнения, пытаясь отогнуть его.

Вы не можете управлять потоками с помощью Suspend / ReleaseThread (), не пытайтесь.

0 голосов
/ 04 марта 2010

Вот практическое решение.

Я хотел, чтобы моя основная программа использовала потоки (затем используя более одного ядра) для обработки заданий и ожидания завершения всех потоков, прежде чем возобновить работу и выполнять другие действия. Я не хотел позволить нитям умирать и создавать новые, потому что это медленно. В моем вопросе я пытался сделать это путем приостановки потоков, что казалось естественным. Но, как отметил nobugz, «Вы можете управлять потоками с помощью Suspend / ReleaseThread ()».

Решение включает в себя семафоры, подобные тому, который я использовал для управления потоками. На самом деле еще один семафор используется для управления основным потоком. Теперь у меня есть один семафор на поток для управления потоками и один семафор для управления основным.

Вот решение:

#include <windows.h>
#include <stdio.h>
#include <math.h>
#include <process.h>

#define TRY  500000
#define LOOP 100

HANDLE *ids;
HANDLE *semaphores;
HANDLE allThreadsSemaphore;

DWORD WINAPI Count(__in LPVOID lpParameter)
{   
    float x = 1.0f;         
    while(1)
    {   
        WaitForSingleObject(semaphores[(int)lpParameter],INFINITE);
        for (int i=1 ; i<LOOP ; i++)
            x = sqrt((float)i*x+rand());
        ReleaseSemaphore(allThreadsSemaphore,1,NULL);
    }
    return (DWORD)(int)x;
}

int main()
{
    SYSTEM_INFO sysinfo;
    GetSystemInfo( &sysinfo );
    int numCPU = sysinfo.dwNumberOfProcessors;

    ids = new HANDLE[numCPU];
    semaphores = new HANDLE[numCPU]; 

    for (int j=0 ; j<numCPU ; j++)
    {
        ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, NULL, NULL);
        // Threads blocked until main releases them one by one
        semaphores[j] = CreateSemaphore(NULL, 0, 1, NULL);
    }
    // Blocks main until threads finish
    allThreadsSemaphore = CreateSemaphore(NULL, 0, numCPU, NULL);

    for (int j=0 ; j<TRY ; j++)
    {
        for (int i=0 ; i<numCPU ; i++) // Let numCPU threads do their jobs
            ReleaseSemaphore(semaphores[i],1,NULL);
        for (int i=0 ; i<numCPU ; i++) // wait for numCPU threads to finish
            WaitForSingleObject(allThreadsSemaphore,INFINITE);
    }
    for (int j=0 ; j<numCPU ; j++)
        CloseHandle(semaphores[j]);
    CloseHandle(allThreadsSemaphore);
    printf("Done\n");
    getc(stdin);
}
0 голосов
/ 04 марта 2010

Проблема в том, что вы ждете чаще, чем сигнализируете.

Цикл for (int j=0 ; j<TRY ; j++) ожидает семафора восемь раз, тогда как четыре потока будут сигнализировать только один раз каждый, а сам цикл - один раз. В первый раз в цикле это не проблема, потому что семафор имеет начальный счет четыре. Второй и каждый последующий раз вас ждут слишком много сигналов. Это смягчается тем, что в первые четыре ожидания вы ограничиваете время и не повторяете ошибку. Так что иногда это может сработать, а иногда ваше ожидание зависнет.

Я думаю, что следующие (непроверенные) изменения помогут.

Инициализировать семафор до нуля:

semaphore = CreateSemaphore(NULL, 0, numCPU, NULL);

Избавиться от ожидания в цикле возобновления потока (т.е. удалить следующее):

   if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT)  
      printf("Timed out !!!\n");  

Удалить посторонний сигнал с конца цикла try (т.е. удалить следующее):

ReleaseSemaphore(semaphore,numCPU,NULL);
...