Как заставить его работать синхронизироваться? - PullRequest
0 голосов
/ 30 сентября 2019

Здравствуйте. Я хочу синхронизировать два потока: один увеличивает переменную, а другой уменьшает ее. Результат, который я хочу, выглядит следующим образом:

Thread #0 j = 1

Thread #1 j = 0

Thread #0 j = 1

Thread #1 j = 0

И так далее ... но мой код иногда работает так, в некоторых случаях он выводит действительно странные значения. Я полагаю, что у меня где-то есть неопределенное поведение, но я не могу понять, что на самом деле происходит.

Мой код состоит из HANDLE ghMutex, который связывается с обработчиком моего мьютекса:

Моя основная функция:

int main(void)
{
    HANDLE aThread[THREADCOUNT];

    ghMutex = CreateMutex(NULL, FALSE, NULL);             

    aThread[0] = (HANDLE)_beginthreadex(NULL, 0, &inc, NULL, CREATE_SUSPENDED, 0);
    aThread[1] = (HANDLE)_beginthreadex(NULL, 0, &dec, NULL, CREATE_SUSPENDED, 0);

    ResumeThread(aThread[0]);
    ResumeThread(aThread[1]);

    WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);

    printf("j = %d\n", j);

    for (int i = 0; i < THREADCOUNT; i++)
        CloseHandle(aThread[i]);

    CloseHandle(ghMutex);

    return 0;
}

Функция Inc:

unsigned int __stdcall inc(LPVOID)
{
    for (volatile int i = 0; i < MAX; ++i)
    {
        WaitForSingleObject(
            ghMutex,    // handle to mutex
            INFINITE);  // no time-out interval

            j++;
            printf("Thread %d j = %d\n", GetCurrentThreadId(), j);
            ReleaseMutex(ghMutex);

    }
    _endthread();

    return TRUE;
}

Функция Dec:

unsigned int __stdcall dec(void*)
{
    for (volatile int i = 0; i < MAX; ++i)
    {
        WaitForSingleObject(
            ghMutex,    // handle to mutex
            INFINITE);  // no time-out interval

        j--;
        printf("Thread %d j = %d\n", GetCurrentThreadId(), j);
        ReleaseMutex(ghMutex);
    }
    _endthread();

    return TRUE;
}

Мне нужно решение Win API для API в std c ++ 98.

Ответы [ 2 ]

3 голосов
/ 30 сентября 2019

Мьютекс не является правильным инструментом для синхронизации двух потоков, он существует для защиты ресурса. У вас есть ресурс j, который защищен вашим мьютексом, однако последовательность того, какой поток получает блокировку, не определена, поэтому вы можете иметь случай, когда dec вызывается несколько раз, прежде чем inc сможет запустить.

Если вы хотите синхронизировать порядок потоков, вам придется использовать другой примитив синхронизации, например семафор. Например, вы можете увеличить семафор в inc и уменьшить его в dec. Это будут классические отношения производитель - потребитель, когда производитель будет остановлен, когда семафор достигнет своего максимального значения, а потребитель будет ждать поступления элементов.

Извините, у меня нет решения WinAPI C ++ 98, потому чтоэто было бы глупо, но я надеюсь, что указал вам правильное направление.

0 голосов
/ 30 сентября 2019

объект windows mutex гарантирует исключительное право собственности, но не заботится о порядке владения. так что один и тот же поток может захватывать несколько раз подряд, в то время как другие будут ждать.

для вашей задачи вам нужно дать сигнал другому потоку, когда ваша задача будет выполнена, а затем дождаться сигнала от другого потока. Для этой задачи может быть использована пара событий, например. нить (i) сигнализировать событие (1-i) и ждать события (i) . для оптимизации вместо 2 вызовов -

SetEvent(e[1-i]); WaitForSingleObject(e[i], INFINITE);

мы можем использовать один вызов SignalObjectAndWait

SignalObjectAndWait(e[1-i], e[i], INFINITE, FALSE)

Конечно, начало и конец цикла требуют особого внимания. для inc

    HANDLE hObjectToSignal  = _hEvent[1], hObjectToWaitOn  = _hEvent[0];

    for (;;)
    {
        _shared_value++;

        if (!--n)
        {
            SetEvent(hObjectToSignal);
            break;
        }

        SignalObjectAndWait(hObjectToSignal, hObjectToWaitOn, INFINITE, FALSE);
    }

и для dec

    HANDLE hObjectToSignal  = _hEvent[0], hObjectToWaitOn  = _hEvent[1];

    WaitForSingleObject(hObjectToWaitOn, INFINITE);
    for (;;)
    {
        --_shared_value;

        if (!--n)
        {
            break;
        }

        SignalObjectAndWait(hObjectToSignal, hObjectToWaitOn, INFINITE, FALSE);
    }

, если написать полный тест, с проверкой ошибок

struct Task 
{
    HANDLE _hEvent[4];
    ULONG _n;
    LONG _iTasks;
    LONG _shared_value;

    Task()
    {
        RtlZeroMemory(this, sizeof(*this));
    }

    ~Task()
    {
        ULONG n = RTL_NUMBER_OF(_hEvent);
        do 
        {
            if (HANDLE hEvent = _hEvent[--n]) CloseHandle(hEvent);
        } while (n);
    }

    ULONG WaitTaskEnd()
    {
        return WaitForSingleObject(_hEvent[2], INFINITE);
    }

    ULONG WaitTaskReady()
    {
        return WaitForSingleObject(_hEvent[3], INFINITE);
    }

    void SetTaskReady()
    {
        SetEvent(_hEvent[3]);
    }

    void End()
    {
        if (!InterlockedDecrement(&_iTasks)) SetEvent(_hEvent[2]);
    }

    void Begin()
    {
        InterlockedIncrementNoFence(&_iTasks);
    }

    static ULONG WINAPI IncThread(PVOID p)
    {
        return reinterpret_cast<Task*>(p)->Inc(), 0;
    }

    void Inc()
    {
        if (WaitTaskReady() == WAIT_OBJECT_0)
        {
            if (ULONG n = _n)
            {
                HANDLE hObjectToSignal  = _hEvent[1], hObjectToWaitOn  = _hEvent[0];

                for (;;)
                {
                    if (_shared_value) __debugbreak();

                    if (n < 17) DbgPrint("Inc(%u)\n", n);

                    _shared_value++;

                    if (!--n)
                    {
                        SetEvent(hObjectToSignal);
                        break;
                    }

                    if (SignalObjectAndWait(hObjectToSignal, hObjectToWaitOn, INFINITE, FALSE) != WAIT_OBJECT_0)
                    {
                        break;
                    }
                }
            }
        }

        End();
    }

    static ULONG WINAPI DecThread(PVOID p)
    {
        return reinterpret_cast<Task*>(p)->Dec(), 0;
    }

    void Dec()
    {
        if (WaitTaskReady() == WAIT_OBJECT_0)
        {
            if (ULONG n = _n)
            {
                HANDLE hObjectToSignal  = _hEvent[0], hObjectToWaitOn  = _hEvent[1];

                if (WaitForSingleObject(hObjectToWaitOn, INFINITE) == WAIT_OBJECT_0)
                {
                    for (;;)
                    {
                        --_shared_value;

                        if (_shared_value) __debugbreak();

                        if (n < 17) DbgPrint("Dec(%u)\n", n);

                        if (!--n)
                        {
                            break;
                        }

                        if (SignalObjectAndWait(hObjectToSignal, hObjectToWaitOn, INFINITE, FALSE) != WAIT_OBJECT_0)
                        {
                            break;
                        }
                    }
                }
            }
        }

        End();
    }

    ULONG Create()
    {
        ULONG n = RTL_NUMBER_OF(_hEvent);
        do 
        {
            if (HANDLE hEvent = CreateEventW(0, n > 2, 0, 0)) _hEvent[--n] = hEvent; 
            else return GetLastError();
        } while (n);

        return NOERROR;
    }

    ULONG Start()
    {
        static PTHREAD_START_ROUTINE aa[] = { IncThread, DecThread };

        ULONG n = RTL_NUMBER_OF(aa);

        do 
        {
            Begin();
            if (HANDLE hThread = CreateThread(0, 0, aa[--n], this, 0, 0))
            {
                CloseHandle(hThread);
            }
            else
            {
                n = GetLastError();
                End();
                return n;
            }
        } while (n);

        return NOERROR;
    }

    ULONG Start(ULONG n)
    {
        _iTasks = 1;

        ULONG dwError = Start();

        _n = dwError ? 0 : n;

        SetTaskReady();

        End();

        return dwError;
    }
};

void TaskTest(ULONG n)
{
    Task task;
    if (task.Create() == NOERROR)
    {
        task.Start(n);
        task.WaitTaskEnd();
    }
}

заметьте, что никакого смыслаобъявить локальную переменную (которая будет доступна только из одного потока и недоступна для прерываний и т. д.) как volatile

также когда мы пишем код, например:

// thread #1
write_shared_data();
SetEvent(hEvent);

// thread #2
WaitForSingleObject(hEvent, INFINITE);
read_shared_data();

внутри SetEvent (hEvent);была атомарная запись в состояние события с выпуском семантики (действительно более сильной, конечно) и ожиданием функции события - атомарное чтение его состояния с более чем приобретением семантики. в результате все, что поток № 1 записывает в память до SetEvent - будет видно потоку № 2 после ожидания события (если ожидание закончилось в результате вызова Set из потока № 1)

...