Подождите, пока переменная не станет нулевой - PullRequest
0 голосов
/ 10 апреля 2020

Я пишу многопоточную программу, которая может выполнять некоторые задачи в отдельных потоках.

Некоторые операции требуют их ожидания в конце выполнения моей программы. Я написал простую защиту для таких «важных» операций:

class CPendingOperationGuard final
{
public: 
    CPendingOperationGuard()
    {
        InterlockedIncrementAcquire( &m_ullCounter );
    }

    ~CPendingOperationGuard()
    {
        InterlockedDecrementAcquire( &m_ullCounter );
    }

    static bool WaitForAll( DWORD dwTimeOut )
    {
        // Here is a topic of my question
        // Return false on timeout
        // Return true if wait was successful
    }

private:
    static volatile ULONGLONG m_ullCounter;
};

Использование простое:

void ImportantTask()
{
    CPendingOperationGuard guard;
    // Do work
}

// ...

void StopExecution()
{
    if(!CPendingOperationGuard::WaitForAll( 30000 )) {
        // Handle error
    }
}

Вопрос в том, как эффективно ждать, пока m_ullCounter не станет равным нулю. или до истечения времени ожидания.

У меня есть две идеи:

  1. Чтобы запустить эту функцию в другом отдельном потоке и написать WaitForSingleObject( hThread, dwTimeout ):

    DWORD WINAPI WaitWorker( LPVOID )
    {
        while(InterlockedCompareExchangeRelease( &m_ullCounter, 0, 0 ))
            ;
    }
    

    Но он «съест» почти 100% процессорного времени - плохая идея.

  2. Вторая идея - разрешить запуск других потоков:

    DWORD WINAPI WaitWorker( LPVOID )
    {
        while(InterlockedCompareExchangeRelease( &m_ullCounter, 0, 0 ))
            Sleep( 0 );
    }
    

    Но это ' Переключу контекст выполнения в режим ядра и обратно - слишком дорого в майской задаче. Плохая идея тоже

Вопрос:
Как выполнить почти нулевые накладные расходы, ожидая, пока моя переменная не станет равной нулю? Может быть, без отдельного потока ... Основным условием является поддержка остановки ожидания по таймауту.

Может быть, кто-то может предложить совершенно другую идею для моей задачи - ждать всех зарегистрированных операций (как в ThreadPools WinAPI) - его API имеет, например, WaitForThreadpoolWaitCallbacks для ожидания всех зарегистрированных задач).

PS: невозможно переписать мой код с API-интерфейсом ThreadPool :(

Ответы [ 2 ]

2 голосов
/ 11 апреля 2020

Посмотрите на функции WaitOnAddress() и WakeByAddressSingle() / WakeByAddressAll(), представленные в Windows 8.

Например:

class CPendingOperationGuard final
{
public: 
    CPendingOperationGuard()
    {
        InterlockedIncrementAcquire(&m_ullCounter);
        Wake­By­Address­All(&m_ullCounter);
    }

    ~CPendingOperationGuard()
    {
        InterlockedDecrementAcquire(&m_ullCounter);
        Wake­By­Address­All(&m_ullCounter);
    }

    static bool WaitForAll( DWORD dwTimeOut )
    {
        ULONGLONG Captured, Now, Deadline = GetTickCount64() + dwTimeOut;
        DWORD TimeRemaining;
        do
        {
            Captured = InterlockedExchangeAdd64((LONG64 volatile *)&m_ullCounter, 0);
            if (Captured == 0) return true;
            Now = GetTickCount64();
            if (Now >= Deadline) return false;
            TimeRemaining = static_cast<DWORD>(Deadline - Now);
        }
        while (WaitOnAddress(&m_ullCounter, &Captured, sizeof(ULONGLONG), TimeRemaining));
        return false;
    }

private:
    static volatile ULONGLONG m_ullCounter;
};

Раймонд Чен написал серию статей в блогах об этих функциях:

WaitOnAddress позволяет создавать объект синхронизации из любой переменной данных, даже байта

Реализация критической секции в терминах WaitOnAddress

Ложные пробуждения, условия гонки и поддельные утверждения FIFO: взгляд за кулисами WaitOnAddress

Расширение нашего критического раздела на основе WaitOnAddress для поддержки тайм-аутов

Сравнение WaitOnAddress с фьютексами (futexi? Futexen?)

Создание семафора из WaitOnAddress

Создание семафора с максимальным счетом из WaitOnAddress

Создание события сброса вручную из WaitOnAddress

Создание автомата c -r Событие eset из WaitOnAddress

Вспомогательная функция шаблона для ожидания WaitOnAddress в al oop

1 голос
/ 10 апреля 2020

вам нужно для этой задачи что-то вроде Защита от понижения вместо CPendingOperationGuard

перед началом работы, вы вызываете ExAcquireRundownProtection и только если возвращается TRUE - начать выполнение операции. в конце вы должны позвонить ExReleaseRundownProtection

, поэтому шаблон должен быть следующим

if (ExAcquireRundownProtection(&RunRef)) {
    do_operation();
    ExReleaseRundownProtection(&RunRef);
}

, если вы хотите остановить этот процесс и дождаться всех активных вызовов do_operation(); закончено - вы вызываете ExWaitForRundownProtectionRelease (вместо WaitWorker)

После вызова ExWaitForRundownProtectionRelease процедура ExAcquireRundownProtection вернет FALSE (поэтому новые операции не начнется после этого). ExWaitForRundownProtectionRelease ожидает возврата до тех пор, пока все не вызовут подпрограмму ExReleaseRundownProtection, чтобы снять ранее приобретенную защиту от отказа (так что, когда все текущие (если существуют) операции завершены). Когда все ожидающие обращения завершены, ExWaitForRundownProtectionRelease возвращает

, к сожалению, этот API реализован системой только в режиме ядра и не имеет аналогов в режиме пользователя. однако не трудно реализовать такую ​​идею самостоятельно

это мой пример:

enum RundownState {
    v_complete = 0, v_init = 0x80000000
};

template<typename T>
class RundownProtection
{
    LONG _Value;

public:

    _NODISCARD BOOL IsRundownBegin()
    {
        return 0 <= _Value;
    }

    _NODISCARD BOOL AcquireRP()
    {
        LONG Value, NewValue;

        if (0 > (Value = _Value))
        {
            do 
            {
                NewValue = InterlockedCompareExchangeNoFence(&_Value, Value + 1, Value);

                if (NewValue == Value) return TRUE;

            } while (0 > (Value = NewValue));
        }

        return FALSE;
    }

    void ReleaseRP()
    {
        if (InterlockedDecrement(&_Value) == v_complete)
        {
            static_cast<T*>(this)->RundownCompleted();
        }
    }

    void Rundown_l()
    {
        InterlockedBitTestAndResetNoFence(&_Value, 31);
    }

    void Rundown()
    {
        if (AcquireRP())
        {
            Rundown_l();
            ReleaseRP();
        }
    }

    RundownProtection(RundownState Value = v_init) : _Value(Value)
    {
    }

    void Init()
    {
        _Value = v_init;
    }
};

///////////////////////////////////////////////////////////////

class OperationGuard : public RundownProtection<OperationGuard>
{
    friend RundownProtection<OperationGuard>;

    HANDLE _hEvent;

    void RundownCompleted()
    {
        SetEvent(_hEvent);
    }

public:

    OperationGuard() : _hEvent(0) {}

    ~OperationGuard() 
    {
        if (_hEvent)
        {
            CloseHandle(_hEvent);
        }
    }

    ULONG WaitComplete(ULONG dwMilliseconds = INFINITE)
    {
        return WaitForSingleObject(_hEvent, dwMilliseconds);
    }

    ULONG Init()
    {
        return (_hEvent = CreateEvent(0, 0, 0, 0)) ? NOERROR : GetLastError();
    }
} g_guard;

//////////////////////////////////////////////

ULONG CALLBACK PendingOperationThread(void*)
{
    while (g_guard.AcquireRP())
    {
        Sleep(1000);// do operation
        g_guard.ReleaseRP();
    }

    return 0;
}

void demo()
{
    if (g_guard.Init() == NOERROR)
    {
        if (HANDLE hThread = CreateThread(0, 0, PendingOperationThread, 0, 0, 0))
        {
            CloseHandle(hThread);
        }

        MessageBoxW(0, 0, L"UI Thread", MB_ICONINFORMATION|MB_OK);

        g_guard.Rundown();

        g_guard.WaitComplete();
    }
}

зачем просто ждать, когда ждать, пока m_ullCounter станет нулевым, недостаточно

если мы читаем 0 из m_ullCounter, это означает, что только в это время нет активной операции. но ожидающая операция может начаться уже , после мы проверяем, что m_ullCounter == 0. мы можем использовать специальный флаг (скажем, bool g_bQuit) и установить его. Операция перед началом проверяет этот флаг и не начинается, если это правда. но в любом случае недостаточно

наивный код:

// рабочий поток

if (!g_bQuit) // (1)
{
    // MessageBoxW(0, 0, L"simulate delay", MB_ICONWARNING);

    InterlockedIncrement(&g_ullCounter); // (4)
    // do operation
    InterlockedDecrement(&g_ullCounter); // (5)
}

// здесь мы ждем всех выполненных операций

    g_bQuit = true; // (2)

    // wait on g_ullCounter == 0, how - not important
    while (g_ullCounter) continue; // (3)
  • ожидающая операция, проверяющая флаг g_bQuit (1) - он еще не установлен, поэтому он начинается
  • рабочий поток заменяется (используйте MessageBox для имитации этого)
  • we set g_bQuit = true; // (2)
  • мы проверяем / ждем g_ullCounter == 0, это 0, поэтому мы выходим (3)
  • , рабочий поток пробуждается (возврат из MessageBox) и увеличивается g_ullCounter (4)

проблема здесь в том, что операция может использовать некоторые ресурсы, которые мы уже начинаем уничтожать после g_ullCounter == 0

, это происходит из-за проверки флага выхода (g_Quit) и счетчика приращений после этого не атомы c - может быть разрыв между ними.

для правильного решения необходим атоми c доступ к флажку + счетчику. это и делает защиту от изношений. для флага + счетчика используется одиночная LONG переменная (32 бита), потому что мы можем сделать атоми c доступ к ней. 31 бит используется для счетчика и 1 бит используется для флага выхода. windows решение использует 0 бит для флага (1 означает выход) и [1..31] бит для счетчика. я использую биты [0..30] для счетчика и 31 бит для флага (0 означает выход). искать

...