Причина состояния гонки в стеке производителя / потребителя - PullRequest
1 голос
/ 09 декабря 2011

Я пытался создать стек производителя-потребителя, основанный на событиях уведомлений, который позволял бы одному потоку выдавать данные, а другому потоку выдавать данные.

Когда буфер заполнен / пуст, один поток ожидает другого, пока не сможет продолжить.

Я обнаруживаю состояние гонки (программа прерывается там, где я отметил ***ERROR HERE***), но я не понимаю, почему это может произойти.

Как size может превысить capacity в этой программе?

#include <process.h>
#include <cstdlib>
#include <vector>
#include <windows.h>

template<typename T, typename Ax = std::allocator<T> >
class rwstack
{
    // It is assumed that only ONE thread will push data
    //   and only ONE thread will pop data.

public:
    typedef T value_type;
    typedef Ax allocator_type;
    typedef rwstack<value_type, allocator_type> this_type;
    typedef std::vector<value_type, allocator_type> container_type;

private:
    allocator_type allocator;
    value_type *items;
    size_t volatile count;
    size_t const capacity;
    HANDLE hEventNotEmpty, hEventNotFull;
    rwstack(const this_type &other) { __debugbreak(); /*Don't allow*/ }

public:
    rwstack(const size_t capacity = 4096)
        : allocator(allocator_type()),
        items(allocator.allocate(capacity, NULL)),
        count(0), capacity(capacity),
        hEventNotEmpty(CreateEvent(NULL, TRUE, FALSE, NULL)),
        hEventNotFull(CreateEvent(NULL, TRUE, TRUE, NULL)) { }

    virtual ~rwstack()  // Not actually used in the example
    {
        CloseHandle(hEventNotEmpty);
        CloseHandle(hEventNotFull);
        for (size_t i = 0; i < count; i++)
        { allocator.destroy(&items[InterlockedDecrementSizeT(&count) - i]); }
        allocator.deallocate(items, capacity);
    }

    value_type &push(const value_type &value)
    {
        const ULONG waitResult = WaitForSingleObject(hEventNotFull, INFINITE);
        if (waitResult != WAIT_OBJECT_0) { __debugbreak(); }
        const size_t newSize = InterlockedIncrementSizeT(&count);
        try
        {
            if (newSize > capacity) { __debugbreak(); }  // ****ERROR HERE****
            if (newSize >= capacity) { ResetEvent(hEventNotFull); }
            allocator.construct(&items[newSize - 1], value);
            SetEvent(hEventNotEmpty);
            return items[newSize - 1];
        }
        catch (...) { InterlockedDecrementSizeT(&count); throw; }
    }

    void pop(value_type *pValue = NULL)
    {
        const ULONG waitResult = WaitForSingleObject(hEventNotEmpty, INFINITE);
        if (waitResult != WAIT_OBJECT_0) { __debugbreak(); }
        const size_t newSize = InterlockedDecrementSizeT(&count);
        try
        {
            if (newSize > capacity) { __debugbreak(); }  // ****ERROR HERE****
            if (newSize <= 0) { ResetEvent(hEventNotEmpty); }
            if (pValue != NULL) { *pValue = items[newSize]; }
            allocator.destroy(&items[newSize]);
            SetEvent(hEventNotFull);
        }
        catch (...) { InterlockedIncrementSizeT(&count); throw; }
    }
};

static size_t InterlockedIncrementSizeT(size_t volatile *p)
{
#if _M_X64
    return InterlockedIncrement64(reinterpret_cast<long long volatile *>(p));
#elif _M_IX86
    return InterlockedIncrement(reinterpret_cast<long volatile *>(p));
#endif
}

static size_t InterlockedDecrementSizeT(size_t volatile *p)
{
#if _M_X64
    return InterlockedDecrement64(reinterpret_cast<long long volatile *>(p));
#elif _M_IX86
    return InterlockedDecrement(reinterpret_cast<long volatile *>(p));
#endif
}

Тестовый код:

typedef rwstack<int> TTestStack;

void __cdecl testPush(void *context)
{
    TTestStack::value_type v;
    for (;;)
        static_cast<TTestStack *>(context)->pop(&v);
}

void __cdecl testPop(void *context)
{
    for (TTestStack::value_type v = 0; ; v++)
        static_cast<TTestStack *>(context)->push(v);
}

int main()
{
    TTestStack rw;
    HANDLE hThreads[2] = {
        reinterpret_cast<HANDLE>(_beginthread(&testPush, 0, &rw)),
        reinterpret_cast<HANDLE>(_beginthread(&testPop,  0, &rw)),
    };
    const ULONG nThreads = sizeof(hThreads) / sizeof(*hThreads)
    WaitForMultipleObjects(nThreads, hThreads, TRUE, INFINITE);
    return 0;
}

1 Ответ

3 голосов
/ 09 декабря 2011

Вы не блокируете правильную операцию

Ключевым моментом здесь является то, что, отключая событие hEventNotFull в потоке A, вы также включаете его в потоке B.

Потоки запускаются одновременно

Итак, вот что происходит:

  1. Очередь заполнена на 4096 элементов.

  2. Поток B получает блокировку и уменьшает счет до 4095. Вам нужно удерживать эту блокировку до тех пор, пока вы не решите, следует ли включать hEventNotFull, но вы сразу же отпустите ее. ОС на мгновение приостанавливает поток B.

  3. Поток A получает блокировку и увеличивает счетчик до 4096. Вам нужно удерживать эту блокировку до тех пор, пока вы не решите, следует ли сбрасывать hEventNotFull, но вы немедленно отпустите ее.

  4. ОС решает, что поток B важнее потока A.

  5. Таким образом, вы в итоге вызовете resetEvent в потоке A, а затем SetEvent в потоке B. В результате вы вернетесь к выполнению в потоке A и посчитаете == 4096.

Ход исполнения:

Thread B: Get count and decrement it to 4095.  # Queue not full
Thread A: Get count and increment it to 4096.  # Queue full
Thread A: ResetEvent on `hEventNotFull`        # A thinks it will block since queue is full
Thread B: SetEvent on `hEventNotFull`          # B is using stale info and unblocks A
...