Как можно безопасно завершить рабочий поток в середине вызова WaitForMultipleObjects из основного потока? - PullRequest
0 голосов
/ 18 апреля 2020

Привет! Я использую C ++ с каркасом Qt для программы Windows. Я использую потоки Qt, но эта ситуация может иметь отношение и к другим API потоков. Я выделяю рабочий поток для отслеживания изменений в каталогах, используя ReadDirectoryChangesW и WaitForMultipleObjects из Win API. Я хочу иметь возможность отменить рабочий поток изящно из основного потока. Я читал о CancellIOEx , который принимает дескриптор и параметр OVERLAPPED, но оба типа данных являются указателями. Есть ли какой-нибудь безопасный способ безопасно передать эти указатели из рабочего потока в основной поток? Есть ли лучший способ сделать что-то?

Вот некоторый код из здесь , но с использованием WaitForSingleObject вместо WaitForMultipleObjects функция будет вызываться из рабочего потока. Могу ли я разместить этот код по ссылке? Также см. здесь для получения ценной информации о ReadDirectoryChangesW за пределами Windows Dev Center.

Спасибо!

void WatchDirectory(LPCWSTR path)
{
   char buf[2048];
   DWORD nRet;
   BOOL result=TRUE;
   char filename[MAX_PATH];
   DirInfo[0].hDir = CreateFile (path, GENERIC_READ|FILE_LIST_DIRECTORY, 
                                 FILE_SHARE_READ|FILE_SHARE_WRITE|FILE_SHARE_DELETE,
                                 NULL, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS|FILE_FLAG_OVERLAPPED,
                                 NULL);

   if(DirInfo[0].hDir == INVALID_HANDLE_VALUE)
   {
       return; //cannot open folder
   }

   lstrcpy( DirInfo[0].lpszDirName, path);
   OVERLAPPED PollingOverlap;

   FILE_NOTIFY_INFORMATION* pNotify;
   int offset;
   PollingOverlap.OffsetHigh = 0;
   PollingOverlap.hEvent = CreateEvent(NULL,TRUE,FALSE,NULL);
   while(result)
   {
       result = ReadDirectoryChangesW(
                  DirInfo[0].hDir,// handle to the directory to be watched
                  &buf,// pointer to the buffer to receive the read results
                  sizeof(buf),// length of lpBuffer
                  TRUE,// flag for monitoring directory or directory tree
                  FILE_NOTIFY_CHANGE_FILE_NAME |
                  FILE_NOTIFY_CHANGE_DIR_NAME |
                  FILE_NOTIFY_CHANGE_SIZE,
                //FILE_NOTIFY_CHANGE_LAST_WRITE |
                //FILE_NOTIFY_CHANGE_LAST_ACCESS |
                //FILE_NOTIFY_CHANGE_CREATION,
                &nRet,// number of bytes returned
                &PollingOverlap,// pointer to structure needed for overlapped I/O
                NULL);

       WaitForSingleObject(PollingOverlap.hEvent,INFINITE);
       offset = 0;
       int rename = 0;
       char oldName[260];
       char newName[260];
       do
       {
           pNotify = (FILE_NOTIFY_INFORMATION*)((char*)buf + offset);
           strcpy(filename, "");
           int filenamelen = WideCharToMultiByte(CP_ACP, 0, pNotify->FileName, pNotify->FileNameLength/2, filename, sizeof(filename), NULL, NULL);
           filename[pNotify->FileNameLength/2] = '';
           switch(pNotify->Action)
           {
               case FILE_ACTION_ADDED:
                   printf("\nThe file is added to the directory: [%s] \n", filename);
                   break;
               case FILE_ACTION_REMOVED:
                   printf("\nThe file is removed from the directory: [%s] \n", filename);
                   break;
               case FILE_ACTION_MODIFIED:
                   printf("\nThe file is modified. This can be a change in the time stamp or attributes: [%s]\n", filename);
                   break;
               case FILE_ACTION_RENAMED_OLD_NAME:
                   printf("\nThe file was renamed and this is the old name: [%s]\n", filename);
                   break;
               case FILE_ACTION_RENAMED_NEW_NAME:
                   printf("\nThe file was renamed and this is the new name: [%s]\n", filename);
                   break;
               default:
                   printf("\nDefault error.\n");
                   break;
            }

           offset += pNotify->NextEntryOffset;

        }while(pNotify->NextEntryOffset); //(offset != 0);
      }

    CloseHandle( DirInfo[0].hDir );

}

Ответы [ 2 ]

1 голос
/ 18 апреля 2020

Основной поток может создать структуру OVERLAPPED и передать ее для использования потоком, а не наоборот. Однако попытка отменить ввод-вывод из основного потока будет в любом случае условием гонки. Поскольку ваш рабочий поток должен делать новый вызов ReadDirectoryChangesEx() после каждого события каталога, он может быть между вызовами ReadDirectoryChangesEx(), когда основной поток хочет завершить рабочий поток, вызывая, таким образом, CancelIoEx() когда нет ввода-вывода в процессе, будет неактивным.

Вместо этого создайте еще один объект события для основного потока и рабочего потока для совместного использования, в дополнение к объекту события, который вы создаете для ввода-вывода. Пусть рабочий поток ожидает оба события одновременно с WaitForMultipleObjects(), и тогда основной поток может сообщить об общем событии, когда он хочет завершить рабочий поток.

WaitForMultipleObjects() сообщит рабочему потоку какое событие было сигнализировано. Если сообщается о событии общего доступа, рабочий поток может отменить свой ввод / вывод в процессе выполнения с помощью CancelIo/Ex() перед выходом.

// shared with both threads...
HANDLE hTermEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
// in main thread...
HANDLE hWorkerThread = CreateThread(...);
...
SetEvent(hTermEvent);
WaitForSingleObject(hWorkerThread, INFINITE);
// called by worker thread...
void WatchDirectory(LPCWSTR path)
{
   DWORD buf[512];
   DWORD nRet, dwRet;
   char filename[MAX_PATH];
   DirInfo[0].hDir = CreateFile(path, GENERIC_READ | FILE_LIST_DIRECTORY, 
                                 FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
                                 NULL, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS | FILE_FLAG_OVERLAPPED,
                                 NULL);

   if (DirInfo[0].hDir == INVALID_HANDLE_VALUE)
   {
       return; //cannot open folder
   }

   lstrcpy(DirInfo[0].lpszDirName, path);

   OVERLAPPED PollingOverlap = {};
   PollingOverlap.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
   if (!PollingOverlap.hEvent)
   {
       return; //cannot create I/O event to wait on
   }

   FILE_NOTIFY_INFORMATION* pNotify;
   int offset;

   do
   {
       if (!ReadDirectoryChangesW(
              DirInfo[0].hDir,// handle to the directory to be watched
              &buf,// pointer to the buffer to receive the read results
              sizeof(buf),// length of lpBuffer
              TRUE,// flag for monitoring directory or directory tree
              FILE_NOTIFY_CHANGE_FILE_NAME |
              FILE_NOTIFY_CHANGE_DIR_NAME |
              FILE_NOTIFY_CHANGE_SIZE,
              //FILE_NOTIFY_CHANGE_LAST_WRITE |
              //FILE_NOTIFY_CHANGE_LAST_ACCESS |
              //FILE_NOTIFY_CHANGE_CREATION,
              &nRet,// number of bytes returned
              &PollingOverlap,// pointer to structure needed for overlapped I/O
              NULL))
       {
           break; // can't wait for an event
       }

       HANDLE events[] = {hTermEvent, PollingOverlap.hEvent};

       dwRet = WaitForMultipleObjects(2, events, FALSE, INFINITE);
       if (dwRet != (WAIT_OBJECT_0 + 1))
       {
           CancelIo(DirInfo[0].hDir);
           GetOverlappedResult(DirInfo[0].hDir, &PollingOverlap, &nRet, TRUE);
           break; // terminate requested, or wait failed
       }

       if (!GetOverlappedResult(DirInfo[0].hDir, &PollingOverlap, &nRet, TRUE))
       {
           break; // read failed
       }

       if (nRet == 0)
       {
           continue; // overflow, current event data discarded
       }

       offset = 0;
       int rename = 0;
       char oldName[MAX_PATH];
       char newName[MAX_PATH];
       do
       {
           pNotify = (FILE_NOTIFY_INFORMATION*) (buf + offset);
           int filenamelen = WideCharToMultiByte(CP_ACP, 0, pNotify->FileName, pNotify->FileNameLength/2, filename, sizeof(filename), NULL, NULL);
           switch (pNotify->Action)
           {
               case FILE_ACTION_ADDED:
                   printf("\nThe file is added to the directory: [%.*s] \n", filenamelen, filename);
                   break;
               case FILE_ACTION_REMOVED:
                   printf("\nThe file is removed from the directory: [%.*s] \n", filenamelen, filename);
                   break;
               case FILE_ACTION_MODIFIED:
                   printf("\nThe file is modified. This can be a change in the time stamp or attributes: [%.*s]\n", filenamelen, filename);
                   break;
               case FILE_ACTION_RENAMED_OLD_NAME:
                   printf("\nThe file was renamed and this is the old name: [%.*s]\n", filenamelen, filename);
                   break;
               case FILE_ACTION_RENAMED_NEW_NAME:
                   printf("\nThe file was renamed and this is the new name: [%.*s]\n", filenamelen, filename);
                   break;
               default:
                   printf("\nDefault error.\n");
                   break;
           }

           offset += pNotify->NextEntryOffset;
       }
       while (pNotify->NextEntryOffset);
   }
   while (true);

   CloseHandle(PollingOverlap.hEvent);
   CloseHandle(DirInfo[0].hDir);
}
0 голосов
/ 18 апреля 2020

завершить поток никогда не безопасно. как минимум это приведет к утечке ресурсов. и главное - вы никогда не сможете узнать, в каком месте был поток, когда вы его прервали. это может быть, например, внутри критической секции alloc / free. и прекратить его в этот момент приводят к тупику при следующей операции кучи (потому что критическая секция никогда не будет освобождена).

Однако существует много правильных решений, как остановить ввод / вывод. Можно, конечно, использовать 2 специальных события, как уже описано в комментариях, но, на мой взгляд, это не лучшее решение

1) мы можем использовать CancelIoEx для дескриптора файла. конечно, просто вызвать CancelIoEx недостаточно - потому что в это время не может быть никакого активного ввода-вывода в выделенном потоке. Еще нужно использовать специальный флаг (_bQuit) для отмены задания, но даже этого недостаточно. нужно проверить / установить этот флаг внутри критической секции или защиты от ранда с помощью ReadDirectoryChangesW/CancelIoEx

в выделенном потоке

AcquireSRWLockExclusive(this);

if (!_bQuit) // (1)
{
    ReadDirectoryChangesW(*); // (4)
}

ReleaseSRWLockExclusive(this);

и для остановки

AcquireSRWLockExclusive(this);

_bQuit = true; // (2)
CancelIoEx(*); (3)

ReleaseSRWLockExclusive(this);

без критической секции В противном случае будет возможно выполнение защиты в рандомном порядке в следующем порядке:

if (!_bQuit) // (1)
_bQuit = true; // (2)
CancelIoEx(*); (3)
ReadDirectoryChangesW(*); // (4)

может возникнуть ситуация, когда сработавший поток сначала проверит флаг _bQuit и он все еще будет ложным. затем основной поток установит флаг и вызовет CancelIoEx, что не будет иметь никакого эффекта, потому что нет ввода-вывода в файл. и только потом сработал вызов потока ReadDirectoryChangesW, который не будет отменен. но используя критическую секцию (в широком смысле), мы делаем это невозможным. так что возможно только 2 заказа: или

if (!_bQuit) ReadDirectoryChangesW(*); // (1)
_bQuit = true; CancelIoEx(*); // (2)

в этом случае ReadDirectoryChangesW будет отменено на CancelIoEx

или

_bQuit = true; CancelIoEx(*); // (1)
if (!_bQuit) ReadDirectoryChangesW(*); // (2)

в этом случае сработавшая нить посмотреть _bQuit флаг установлен и не вызывать ReadDirectoryChangesW больше.

в полном коде это может выглядеть так:

inline ULONG BOOL_TO_ERROR(BOOL f)
{
    return f ? NOERROR : GetLastError();
}

struct WatchFolder : SRWLOCK
{
    HANDLE _hThread, _hFile;
    BOOLEAN _bQuit;

    WatchFolder() : _hThread(0), _hFile(0), _bQuit(false)
    {
        InitializeSRWLock(this);
    }

    ~WatchFolder()
    {
        if (_hThread) {
            WaitForSingleObject(_hThread, INFINITE);
            CloseHandle(_hThread);
        }
        if (_hFile) CloseHandle(_hFile);
    }

    static ULONG CALLBACK _WatchDirectory(PVOID This)
    {
        reinterpret_cast<WatchFolder*>(This)->WatchDirectory();
        return 0;
    }

    void WatchDirectory()
    {
        OVERLAPPED ov {};

        if (ov.hEvent = CreateEvent(0, 0, 0, 0))
        {
            union {
                FILE_NOTIFY_INFORMATION fni;
                char buf[0x800];// must be aligned as FILE_NOTIFY_INFORMATION
            };

            for(;;) 
            {
                AcquireSRWLockExclusive(this);

                ULONG dwError = _bQuit ? ERROR_OPERATION_ABORTED : BOOL_TO_ERROR(
                    ReadDirectoryChangesW(_hFile, buf, sizeof(buf), TRUE, FILE_NOTIFY_VALID_MASK, 0, &ov, 0));

                ReleaseSRWLockExclusive(this);

                ULONG NumberOfBytesTransferred = 0;

                if (dwError == NOERROR)
                {
                    dwError = BOOL_TO_ERROR(GetOverlappedResult(_hFile, &ov, &NumberOfBytesTransferred, TRUE));
                }

                if (dwError || !NumberOfBytesTransferred)
                {
                    if (dwError != ERROR_OPERATION_ABORTED)
                    {
                        __nop();
                    }
                    break;
                }

                FILE_NOTIFY_INFORMATION* pNotify = &fni;

                ULONG NextEntryOffset = 0;
                do 
                {
                    (PBYTE&)pNotify += NextEntryOffset;

                    DbgPrint("%x %.*S\n", pNotify->Action, pNotify->FileNameLength / sizeof(WCHAR), pNotify->FileName);

                } while (NextEntryOffset = pNotify->NextEntryOffset);
            }

            CloseHandle(ov.hEvent);
        }
    }

    ULONG Start(PCWSTR szFile)
    {
        HANDLE hFile = CreateFileW(szFile, FILE_GENERIC_READ, FILE_SHARE_VALID_FLAGS,
            NULL, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS|FILE_FLAG_OVERLAPPED, NULL);

        ULONG dwError;

        if (hFile != INVALID_HANDLE_VALUE)
        {
            if (_hThread = CreateThread(0, 0, _WatchDirectory, this, 0, 0))
            {
                _hFile = hFile;

                return NOERROR;
            }
            dwError = GetLastError();
            CloseHandle(hFile);
        }
        else
        {
            dwError = GetLastError();
        }

        return dwError;
    }

    void Stop()
    {
        AcquireSRWLockExclusive(this);

        _bQuit = true, CancelIoEx(_hFile, 0);

        ReleaseSRWLockExclusive(this);
    }
};

void test()
{
    WatchFolder wf;
    if (wf.Start(L"somepath") == NOERROR)
    {
        MessageBoxW(0,0,0,0);
        wf.Stop();
    }
}

2) другим способом сделать это просто позвонив CloseHandle(_hFile) вместо CancelIoEx(_hFile, 0);. когда дескриптор (последний, но предполагается, что у вас только один дескриптор) закрыт - завершение системы завершено ReadDirectoryChangesW со статусом STATUS_NOTIFY_CLEANUP. код будет очень похож на случай CancelIoEx за исключением того, что теперь ошибка при завершении будет ERROR_NOTIFY_CLEANUP вместо ERROR_OPERATION_ABORTED. но если использовать GetOverlappedResult[Ex] существует проблема - этот API имеет ошибку в реализации - он потерял все положительные значения статуса. это просто потерян STATUS_NOTIFY_CLEANUP (но мы, конечно, можем просмотреть его в Internal поле OVERLAPPED. код может быть следующим:

            AcquireSRWLockExclusive(this);

            ULONG dwError = _bQuit ? ERROR_NOTIFY_CLEANUP : BOOL_TO_ERROR(
                ReadDirectoryChangesW(_hFile, buf, sizeof(buf), TRUE, FILE_NOTIFY_VALID_MASK, 0, &ov, 0));

            ReleaseSRWLockExclusive(this);

            ULONG NumberOfBytesTransferred = 0;

            if (dwError == NOERROR)
            {
                dwError = BOOL_TO_ERROR(GetOverlappedResult(_hFile, &ov, &NumberOfBytesTransferred, TRUE));
                // fix for error in GetOverlappedResult
                if (dwError == NOERROR && ov.Internal) dwError = RtlNtStatusToDosError((NTSTATUS)ov.Internal);
            }

            if (dwError || !NumberOfBytesTransferred)
            {
                if (dwError != ERROR_NOTIFY_CLEANUP)
                {
                    __nop();
                }
                break;
            }

и для остановки

    AcquireSRWLockExclusive(this);

    _bQuit = true, CloseHandle(_hFile), _hFile = 0;

    ReleaseSRWLockExclusive(this);

3) еще один вариант - использовать ожидаемое ожидание внутри GetOverlappedResultEx и вставить ap c (или предупредить рабочий поток). в этом случае нам не нужно использовать критическую секцию / или защиту от ранения - потому что независимо от того, будет ли вставлен пункт ap c (или оповещение) до или после вызова ReadDirectoryChangesW, - в любом случае он будет прерван.

            ULONG dwError = _bQuit ? STATUS_USER_APC : BOOL_TO_ERROR(
                ReadDirectoryChangesW(_hFile, buf, sizeof(buf), TRUE, FILE_NOTIFY_VALID_MASK, 0, &ov, 0));

            ULONG NumberOfBytesTransferred = 0;

            if (dwError == NOERROR)
            {
                dwError = BOOL_TO_ERROR(GetOverlappedResultEx(_hFile, &ov, &NumberOfBytesTransferred, INFINITE, TRUE));
            }

            if (dwError || !NumberOfBytesTransferred)
            {
                if (dwError == STATUS_USER_APC)
                {
                    CancelIo(_hFile);
                    GetOverlappedResult(_hFile, &ov, &NumberOfBytesTransferred, TRUE);
                }
                break;
            }

и для остановки нам нужно

static VOID NTAPI dummyAPC(_In_ ULONG_PTR )
{

}

_bQuit = true;
QueueUserAPC(dummyAPC, _hThread, 0);

, конечно, вместо вызова dummyAPC (что не нужно) лучше использовать оповещение, но GetOverlappedResultEx (точнее WaitForSingleObjectEx) игнорировать STATUS_ALERT и снова начинать ждать после него прервано STATUS_ALERT. поэтому нужно использовать пользовательский код здесь

ULONG
WINAPI
GetOverlappedResult2( _In_ LPOVERLAPPED lpOverlapped,
                      _Out_ PULONG_PTR lpNumberOfBytesTransferred)
{
    while (lpOverlapped->Internal == STATUS_PENDING)
    {
        if (NTSTATUS status = ZwWaitForSingleObject(lpOverlapped->hEvent, TRUE, 0))
        {
            return RtlNtStatusToDosError(status);
        }
    }

    KeMemoryBarrier();

    *lpNumberOfBytesTransferred = lpOverlapped->InternalHigh;

    return RtlNtStatusToDosError((NTSTATUS)lpOverlapped->Internal);
}

и с ним можно использовать следующий код:

            ULONG dwError = _bQuit ? ERROR_ALERTED : BOOL_TO_ERROR(
                ReadDirectoryChangesW(_hFile, buf, sizeof(buf), TRUE, FILE_NOTIFY_VALID_MASK, 0, &ov, 0));

            ULONG_PTR NumberOfBytesTransferred = 0;

            if (dwError == NOERROR)
            {
                dwError = GetOverlappedResult2(&ov, &NumberOfBytesTransferred);
            }

            if (dwError || !NumberOfBytesTransferred)
            {
                if (dwError == ERROR_ALERTED)
                {
                    CancelIo(_hFile);
                    GetOverlappedResult(_hFile, &ov, (ULONG*)&NumberOfBytesTransferred, TRUE);
                }
                break;
            }

и для остановки

_bQuit = true;
NtAlertThread(_hThread);

4) однако лучший способ для моего варианта - не использовать все выделенные потоки, а использовать полный асинхронный ввод-вывод. пример кода

struct WatchFolderCB : SRWLOCK, OVERLAPPED
{
    HANDLE _hFile;
    LONG _dwRefCount;
    union {
        FILE_NOTIFY_INFORMATION fni;
        char buf[0x800];// must be aligned as FILE_NOTIFY_INFORMATION
    };
    BOOLEAN _bQuit;

    void AddRef()
    {
        InterlockedIncrementNoFence(&_dwRefCount);
    }

    void Release()
    {
        if (!InterlockedDecrement(&_dwRefCount))
        {
            delete this;
        }
    }

    WatchFolderCB() : _hFile(0), _bQuit(false), _dwRefCount(1)
    {
        InitializeSRWLock(this);
        RtlZeroMemory(static_cast<OVERLAPPED*>(this), sizeof(OVERLAPPED));
    }

    ~WatchFolderCB()
    {
        if (_hFile) CloseHandle(_hFile);
    }

    static VOID WINAPI _IoCompletionCallback(
        _In_    DWORD dwErrorCode,
        _In_    DWORD dwNumberOfBytesTransfered,
        _Inout_ LPOVERLAPPED lpOverlapped
        )
    {
        static_cast<WatchFolderCB*>(lpOverlapped)->IoCompletionCallback(
            RtlNtStatusToDosError(dwErrorCode), dwNumberOfBytesTransfered);
    }

    VOID IoCompletionCallback(DWORD dwErrorCode, DWORD NumberOfBytesTransferred)
    {
        if (dwErrorCode || !NumberOfBytesTransferred)
        {
            if (dwErrorCode != ERROR_NOTIFY_CLEANUP)
            {
                __nop();
            }
        }
        else
        {
            FILE_NOTIFY_INFORMATION* pNotify = &fni;

            ULONG NextEntryOffset = 0;
            do 
            {
                (PBYTE&)pNotify += NextEntryOffset;

                DbgPrint("%x %.*S\n", pNotify->Action, pNotify->FileNameLength / sizeof(WCHAR), pNotify->FileName);

            } while (NextEntryOffset = pNotify->NextEntryOffset);

            ReadChanges();
        }

        Release();
    }

    void ReadChanges()
    {
        AddRef();

        AcquireSRWLockExclusive(this);

        ULONG dwError = _bQuit ? ERROR_NOTIFY_CLEANUP : BOOL_TO_ERROR(
            ReadDirectoryChangesW(_hFile, buf, sizeof(buf), TRUE, FILE_NOTIFY_VALID_MASK, 0, this, 0));

        ReleaseSRWLockExclusive(this);

        if (dwError)
        {
            IoCompletionCallback(dwError, 0);
        }
    }

    ULONG Start(PCWSTR szFile)
    {
        HANDLE hFile = CreateFileW(szFile, FILE_GENERIC_READ, FILE_SHARE_VALID_FLAGS,
            NULL, OPEN_EXISTING, FILE_FLAG_BACKUP_SEMANTICS|FILE_FLAG_OVERLAPPED, NULL);

        ULONG dwError;

        if (hFile != INVALID_HANDLE_VALUE)
        {
            if (BindIoCompletionCallback(hFile, _IoCompletionCallback, 0))
            {
                _hFile = hFile;
                ReadChanges();
                return NOERROR;
            }
            dwError = GetLastError();
            CloseHandle(hFile);
        }
        else
        {
            dwError = GetLastError();
        }

        return dwError;
    }

    void Stop()
    {
        AcquireSRWLockExclusive(this);

        _bQuit = true, CloseHandle(_hFile), _hFile = 0;

        ReleaseSRWLockExclusive(this);
    }
};


void test1()
{
    if (WatchFolderCB* p = new WatchFolderCB)
    {
        if (p->Start(L"*") == NOERROR)
        {
            MessageBoxW(0,0,0,0);
            p->Stop();
        }
        p->Release();
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...