Резьба Безопасное окончание трубы - PullRequest
0 голосов
/ 25 августа 2018

(Примечание перед началом: хотя мой вопрос общий, мой код должен быть скомпилирован с унаследованным приложением Visual Studio 2008 MFC и должен использовать синхронизацию MFC или win32, избегайте ответов, используя, например, boost или c ++ 11)

Я пытаюсь реализовать Thread Safe Pipe (Очередь с одним читателем и одним писателем), я сделал следующее:

template<class T>
class CMultiThreadPipe { 

private:
    HANDLE hSemaphore, hTerminateEvent1, hTerminateEvent2;
    CRITICAL_SECTION listMutex; 
    CList<T*, T*> list;

public:
    CMultiThreadPipe() { 
        InitializeCriticalSection(&listMutex);
        hSemaphore = CreateSemaphore(NULL, 0, LONG_MAX, NULL);
        hTerminateEvent1 = ::CreateEvent(NULL, TRUE, FALSE, NULL); 
        hTerminateEvent2 = ::CreateEvent(NULL, TRUE, FALSE, NULL);
    }

    // pdata must be allocated with new. The dequeueing thread will delete it
    void Enqueue(T* pdata) { 
        EnterCriticalSection(&listMutex);
        list.AddHead(pdata);
        LeaveCriticalSection(&listMutex);
        ReleaseSemaphore(hSemaphore, 1, NULL);
    }

    // if Dequeue returns null it means the pipe was destroyed and no further queue method calls are legal
    // Dequeue caller is responsible to delete the returned instance
    T* Dequeue()
    {
        HANDLE handles[] = { hTerminateEvent1, hSemaphore };
        DWORD waitRes = WaitForMultipleObjects(2, handles, FALSE, INFINITE);
        if (waitRes==WAIT_OBJECT_0) {
            SetEvent(hTerminateEvent2);
            return NULL; // terminated
        }
        EnterCriticalSection(&listMutex);
        T* elem = list.RemoveTail(); 
        LeaveCriticalSection(&listMutex);
        return elem; // handler must delete item
    }

    void Destroy() {
        SetEvent(hTerminateEvent1);
        WaitForSingleObject(hTerminateEvent2, INFINITE);
        EnterCriticalSection(&listMutex);
        POSITION pos = list.GetHeadPosition(); 
        for (int i = 0; i < list.GetCount(); i++) delete list.GetNext(pos); 
        LeaveCriticalSection(&listMutex);
        DeleteCriticalSection(&listMutex);
        CloseHandle(hSemaphore);
    }

    ~CMultiThreadPipe() { 
        Destroy();
    }
};

Код используется следующим образом:

class QueueData {
    public:
        QueueData(int i) : m_data(i) {};
        int m_data;
};

UINT DequeueThreadProc(LPVOID dummy);

CMultiThreadedPipe<QueueData>* pPipe = NULL;

void main() {
    pPipe = new CMultiThreadedPipe<QueueData>();
    start new thread running DequeueThreadProc

    int counter=0;
    for (int counter=0; counter<10; counter++)
    {
        pPipe->Enqueue(new QueueData(counter));
        Sleep(300);
    }
    delete pPipe;
}

UINT DequeueThreadProc(LPVOID ignore)
{
    QueueData* queueData;
    while ((queueData = pPipe->Dequeue()) != NULL) {
        delete queueData;
        Sleep(1000);
    };
    return 0;
}

Проблема, с которой я столкнулся, связана с завершением, в описанной выше реализации, когда канал уничтожается (всегда потоком-инициатором), он ожидает, когда поток-обработчик узнает, что он завершился, перед удалением очереди.,Это должно быть сделано, чтобы предотвратить ситуацию, когда поток блокировки пытается удалить из очереди после разрушения канала.

Если поток удаления из очереди не продолжает вызывать команду dequeue, первый поток будет зависать в деструкторе, в том числе и в случае удаления очереди.Поток ждет много времени между вызовами, чтобы удалить из очереди деструктор первого потока, соответственно, застрял там.Любая помощь приветствуется!

1 Ответ

0 голосов
/ 25 августа 2018

для безопасного уничтожения объекта, доступ к которому осуществляется из нескольких потоков, необходимо использовать подсчет ссылок на него.перед передачей указателя объекта на новый поток - вы увеличиваете ссылку на объект.когда поток больше не использует объект или если создать поток не удается, вы уменьшаете счетчик ссылок.когда последняя ссылка на объект освобождена - вы можете безопасно вызвать деструктор для объекта.и вам не нужно здесь ждать каких-либо тем.

также для реализации такой очереди - в Windows существует специальный объект - с именем Порты завершения ввода / вывода в пространстве пользователя (в пространстве ядра известно как KQUEUE).с этим объектом - реализация будет более эффективной и простой - вам не нужно управлять собственным списком (CList в вашем коде), синхронизировать доступ к нему - все это будет сделано для вас в пространстве ядра (PostQueuedCompletionStatus -> KeInsertQueue, GetQueuedCompletionStatus -> KeRemoveQueue).вам нужно создать только iocp, (kqueue) объект.

class CMultiThreadPipe {

public:

    class __declspec(novtable) QueueData {
    public:

        virtual void ProcessItem() = 0;

        virtual ~QueueData()
        {
            DbgPrint("%x: %s<%p>\n", GetCurrentThreadId(), __FUNCTION__, this);
        }

        QueueData()
        {
            DbgPrint("%x: %s<%p>\n", GetCurrentThreadId(), __FUNCTION__, this);
        }
    };

private:
    HANDLE _hIOCP;
    LONG _dwRef;
    ULONG _nThreads;

    void DequeueThreadProc()
    {
        ULONG NumberOfBytesTransferred;
        QueueData* pData;
        OVERLAPPED* pOverlapped;

        while (GetQueuedCompletionStatus(_hIOCP, 
            &NumberOfBytesTransferred, 
            (ULONG_PTR*)&pData, 
            &pOverlapped, INFINITE))
        {
            if (pData)
            {
                pData->ProcessItem();
            }
            else
            {
                break;
            }
        }

        Release();
    }

    __declspec(noreturn) static DWORD CALLBACK _DequeueThreadProc(PVOID pThis)
    {
        reinterpret_cast<CMultiThreadPipe*>(pThis)->DequeueThreadProc();
        FreeLibraryAndExitThread((HMODULE)&__ImageBase, 0);
    }

    ~CMultiThreadPipe()
    {
        if (_hIOCP)
        {
            CloseHandle(_hIOCP);
        }
    }

public:

    CMultiThreadPipe() : _dwRef(1), _hIOCP(0)
    {
    }

    void AddRef()
    {
        InterlockedIncrement(&_dwRef);
    }

    void Release()
    {
        if (!InterlockedDecrement(&_dwRef))
        {
            delete this;
        }
    }

    ULONG Create(DWORD NumberOfDequeueThreads)
    {
        if (_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, NumberOfDequeueThreads))
        {
            ULONG n = 0;
            do 
            {
                HMODULE hModule;
                if (GetModuleHandleExW(GET_MODULE_HANDLE_EX_FLAG_FROM_ADDRESS, (PCWSTR)_DequeueThreadProc, &hModule))
                {
                    AddRef();

                    if (HANDLE hThread = CreateThread(0, 0, _DequeueThreadProc, this, 0, 0))
                    {
                        CloseHandle(hThread);
                        n++;
                    }
                    else
                    {
                        Release();
                        FreeLibrary(hModule);
                    }
                }

            } while (--NumberOfDequeueThreads);

            _nThreads = n;

            return n ? NOERROR : ERROR_GEN_FAILURE;
        }

        return GetLastError();
    }

    ULONG Enqueue(QueueData* pData)
    {
        return PostQueuedCompletionStatus(_hIOCP, 0, (ULONG_PTR)pData, 0) ? NOERROR : GetLastError();
    }

    void Destroy()
    {
        if (ULONG n = _nThreads)
        {
            do 
            {
                PostQueuedCompletionStatus(_hIOCP, 0, 0, 0);
            } while (--n);
        }
    }
};

и использование:

class QueueData : public CMultiThreadPipe::QueueData
{
    int m_data; 

    virtual void ProcessItem()
    {
        DbgPrint("%x: %s<%p>(%u)\n", GetCurrentThreadId(), __FUNCTION__, this, m_data);
        delete this;
    }
public:
    QueueData(int i) : m_data(i) {};
};

void testQueue()
{
    if (CMultiThreadPipe* pPipe = new CMultiThreadPipe)
    {
        if (pPipe->Create(8) == NOERROR)
        {
            int n = 64;

            do 
            {
                if (QueueData* pData = new QueueData(n))
                {
                    if (pPipe->Enqueue(pData))
                    {
                        delete pData;
                    }
                }
            } while (--n);

            pPipe->Destroy();
        }
        pPipe->Release();
    }
}

note с такими CMultiThreadPipe реализациями - вам не нужно ждать при выходе из рабочих потоков.даже если ваш код внутри dll и вы выгружаете dll - вам не нужно ждать.каждый поток имеет собственную ссылку на объект и модуль.и отпустите его при выходе

...