Существует ли набор функций Win32 API для управления синхронизированными очередями? - PullRequest
2 голосов
/ 16 июня 2010

У меня есть программа с несколькими рабочими потоками и основной поток, который получает задания.В главном потоке я хочу поставить задачи в синхронизированную очередь, и рабочие потоки, сидящие там, все ожидают в очереди.Когда в очереди что-то есть, я хочу, чтобы работник вытащил задание из очереди, а оставшаяся работа сидит там в ожидании другого задания.

Я обнаружил CreateMsgQueue (http://msdn.microsoft.com/en-us/library/ms885180.aspx), однако, похоже, это толькосуществует для Windows CE.

Я знаю, что мог бы написать это сам, однако, если бы что-то уже существовало, я был бы дураком, если бы не использовал его.

Я занимаюсь разработкой на c ++ с использованием Visual Studio 2005.

Любые предложения с благодарностью получены.

Спасибо, Рич

1 Ответ

4 голосов
/ 16 июня 2010

Windows не обеспечивает именно то, что вы хотите. Он обеспечивает пулы потоков - с ними вам не только не нужно создавать очередь самостоятельно, но также не нужно создавать или (напрямую) управлять потоками.

Конечно, синхронизированные очереди тоже существуют, но не как часть Windows. Тот, который я написал, выглядит так:

#ifndef QUEUE_H_INCLUDED
#define QUEUE_H_INCLUDED

#include <windows.h>

template<class T, unsigned max = 256>
class queue { 
    HANDLE space_avail; // at least one slot empty
    HANDLE data_avail;  // at least one slot full
    CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos

    T buffer[max];
    long in_pos, out_pos;
public:
    queue() : in_pos(0), out_pos(0) { 
        space_avail = CreateSemaphore(NULL, max, max, NULL);
        data_avail = CreateSemaphore(NULL, 0, max, NULL);
        InitializeCriticalSection(&mutex);
    }

    void push(T data) { 
        WaitForSingleObject(space_avail, INFINITE);       
        EnterCriticalSection(&mutex);
        buffer[in_pos] = data;
        in_pos = (in_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(data_avail, 1, NULL);
    }

    T pop() { 
        WaitForSingleObject(data_avail,INFINITE);
        EnterCriticalSection(&mutex);
        T retval = buffer[out_pos];
        out_pos = (out_pos + 1) % max;
        LeaveCriticalSection(&mutex);
        ReleaseSemaphore(space_avail, 1, NULL);
        return retval;
    }

    ~queue() { 
        DeleteCriticalSection(&mutex);
        CloseHandle(data_avail);
        CloseHandle(space_avail);
    }
};

#endif
...