QueuUserAPC иногда не запускает процедуру apc в потоке, ожидающем с GetQueuedCompletionStatusEx - PullRequest
0 голосов
/ 06 октября 2019

Я работаю над простым клиентом асинхронного сокета, построенным поверх winsock с портами завершения io, иногда он работает, а иногда блокирует со странным поведением

это дизайн асинхронного процесса:

  • существуют глобальные потоки iocp, работающие для получения пакетов завершения и уведомления отправителей
  • отправители используют процедуру QueueUserApc для выполнения вызова io в контексте одного из ожидающих потоков
  • подпрограмма apc выполняет асинхронный вызов io, и поток официанта должен получить результат и уведомить об этом эмитента

Это моя простая оболочка для портов завершения io

template <class T, auto CloseFn>
struct HandleDeleter
{
    using pointer = T;
    void operator ()(pointer handle) { CloseFn(handle); }
};

using Handle = std::unique_ptr<void, HandleDeleter<HANDLE, CloseHandle>>;

struct IoContext : public OVERLAPPED
{
    IoContext()
    {
        memset(this, 0, sizeof(OVERLAPPED));
    }
};

struct CompletionResult
{
    CompletionResult() : BytesTransferred(0), Key(0), Ctx(nullptr) {}
    CompletionResult(unsigned long bytes, uintptr_t key, IoContext* ctx) : BytesTransferred(bytes), Key(key), Ctx(ctx) {}
    unsigned long BytesTransferred;
    uintptr_t Key;
    IoContext *Ctx;
};

class IoPort
{
    Handle port_handle;
    public:
    IoPort()
    {}
    IoPort(Handle&& port) : port_handle(std::move(port))
    {}
    IoPort(IoPort&& rhs) : port_handle(std::move(rhs.port_handle))
    {}

    IoPort& operator=(IoPort&& rhs)
    {
        port_handle = std::move(rhs.port_handle);
        return *this;
    }

    bool Create(unsigned int max_threads = 0);

    bool AddToPort(Handle& file, uintptr_t key);

    bool AddToPort(SOCKET sock, uintptr_t key);

    bool GetResultOrApc(CompletionResult& result, DWORD WaitTimeMs);

    void Close()
    {
        port_handle.reset();
    }

};

bool IoPort::Create(unsigned int max_threads)
{
    port_handle.reset(CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, max_threads));
    return bool(port_handle);
}

bool IoPort::AddToPort(Handle & file, uintptr_t key)
{
    return CreateIoCompletionPort(file.get(), port_handle.get(), key, 0) != nullptr;
}

bool IoPort::AddToPort(SOCKET sock, uintptr_t key)
{
    return CreateIoCompletionPort((HANDLE)sock, port_handle.get(), key, 0) != nullptr;
}

bool IoPort::GetResultOrApc(CompletionResult & result, DWORD WaitTimeMs)
{
    OVERLAPPED_ENTRY entry = { 0 };
    DWORD count = 0;
    BOOL wait_result = GetQueuedCompletionStatusEx(port_handle.get(), &entry, 1, &count, WaitTimeMs, TRUE);
    if (entry.lpOverlapped)
    {
        result.Key = entry.lpCompletionKey;
        result.BytesTransferred = entry.dwNumberOfBytesTransferred;
        result.Ctx = (IoContext*)entry.lpOverlapped;
    }
    return wait_result;
}

тогда это глобальные потоки iocp, которые запускаются перед входом в основную функцию

struct IOCPThreads
{
    iocp::IoPort Port;

    std::vector<std::thread> IoThreads;

    bool Exit = false;

    IOCPThreads()
    {
        if (!Port.Create())
        std::cout << "[!] failed to open the port , error : " << GetLastError() << std::endl;
        auto num = std::thread::hardware_concurrency();
        IoThreads.reserve(num);
        std::stringstream thd_ids;
        thd_ids << "threads ids : { ";
        for (auto i : range(0u, num))
        {
        IoThreads.emplace_back(std::thread(&IOCPThreads::IoThread, this));
        thd_ids << IoThreads[i].get_id() << ", ";
        }
        auto str = thd_ids.str();
        str.pop_back();
        str.pop_back();
        str += " }";
        std::cout << str << std::endl;
    }

    ~IOCPThreads()
    {
        Exit = true;
        for (auto& thd : IoThreads)
        if (thd.joinable())
            thd.join();
    }

    void IoThread()
    {
        while (!Exit)
        {
        iocp::CompletionResult result;
        Port.GetResultOrApc(result, 10);
        if (GetLastError() == WAIT_IO_COMPLETION)
            std::cout << "[*] received an apc" << std::endl; // sometimes print and sometimes no
        if (!result.Ctx)
            continue;

        DWORD transeferred, Flags;
        std::unique_ptr<ClientIoCtx> ctx((ClientIoCtx*)result.Ctx);
        WSAGetOverlappedResult(ctx->sock->GetHandle().get(), ctx.get(), &transeferred, FALSE, &Flags);
        int WsaError = WSAGetLastError();

        if (ctx->op == ClientIoOp::Recv)
        {
            std::unique_ptr<ClientIoRecvCtx> RecvCtx = unique_static_cast<ClientIoRecvCtx>(ctx);
            RecvCtx->OnResult(*RecvCtx->sock, WsaError, std::span{ RecvCtx->buffer, RecvCtx->buffer + RecvCtx->length }, transeferred);
        }
        }
    }

};

IOCPThreads ClientIoThreads;

, и это реализация сокета:

enum class async_status_code : unsigned int
{
    Pending,
    Finished,
    Error,
};

struct AsyncResult
{
    async_status_code code;
    unsigned int transferred;
    AsyncResult() : code(async_status_code::Error), transferred(0)
    {}
    AsyncResult(async_status_code c, unsigned int s) : code(c), transferred(s)
    {}
    bool IsPending() const
    {
        return code == async_status_code::Pending;
    }
    bool IsFinished() const
    {
        return code == async_status_code::Finished;
    }
    bool IsFailed() const
    {
        return code == async_status_code::Error;
    }
};

using AsyncSendFunc = std::function<void(Socket& sock, int ErrorCode, std::span<const char> Buffer, DWORD transeferred)>;

enum class SendRecvFlags
{
    UnSpecefied = 0,
    MsgDontRoute = 0x4,
    MsgOOB = 0x1,
    MsgPeek = 0x2,
    MsgWaitAll = 0x8,
    MsgPushImmediate = 0x20,
};

enum class ClientIoOp
{
    Send,
};

struct ApcResult
{
    async_status_code code;
    DWORD transeferred;
    int error;
};

struct ClientIoCtx : public IoContext
{
    Socket *sock;
    ClientIoOp op;
    promise<ApcResult> notifier;
};

struct ClientIoSendCtx : public ClientIoCtx
{
    ClientIoSendCtx() { op = ClientIoOp::Send; }
    const char *buffer;
    int length;
    SendRecvFlags flags;
    AsyncSendFunc OnResult;
};

AsyncResult LIB_NAMESPACE::net::Socket::AsyncSend(const char * buffer, int length, io::IoContext & IoCtx, SendRecvFlags flags)
{
    WSABUF buff_vec{ length, (char*)buffer };
    DWORD sent = 0;
    if (WSASend(sock.get(), &buff_vec, 1, &sent, (DWORD)flags, &IoCtx, nullptr) == 0)
        return AsyncResult(async_status_code::Finished, sent);
    else if (WSAGetLastError() == WSA_IO_PENDING)
        return AsyncResult(async_status_code::Pending, sent);
    return AsyncResult();
}

AsyncResult LIB_NAMESPACE::net::Socket::AsyncSend(const char * buffer, int length, const AsyncSendFunc & OnSend, SendRecvFlags flags)
{   
    ClientIoSendCtx *ctx = new ClientIoSendCtx();
    ctx->sock = this;
    ctx->buffer = buffer;
    ctx->length = length;
    ctx->flags = flags;
    ctx->OnResult = OnSend;
    auto fut = ctx->notifier.get_future(); // notifier is a promise

    auto& thd = ClientIoThreads.IoThreads[RandomUInt(0, ClientIoThreads.IoThreads.size() - 1)];
    std::cout << "[*] launchin the apc routine in thread : " << thd.get_id() << std::endl; // it's always one of the io threads
    auto dwRes = QueueUserAPC(Socket::ApcRoutine, thd.native_handle(), (ULONG_PTR)ctx);
    if (!dwRes) // doesn't happen
    {
        delete ctx;
        std::cout << "QueueUserAPC failed with error " << GetLastError() << std::endl;
        return AsyncResult();
    }

    auto result = fut.get(); // --> blocks here because the apc routine doesn't start

    WSASetLastError(result.error); // the wsa error is thread local so I transfer it from the apc thread to this thread

    return AsyncResult(result.code, result.transeferred);
}

void ApcRoutine(unsigned long long Param)
{
    std::cout << "[*] ApcRoutine started" << std::endl; // sometimes is printed and sometimes no

    std::unique_ptr<ClientIoCtx> ctx((ClientIoCtx*)Param);


    if (ctx->op == ClientIoOp::Send)
    {
        std::unique_ptr<ClientIoSendCtx> SendCtx = unique_static_cast<ClientIoSendCtx>(ctx);
        auto async_result = SendCtx->sock->AsyncSend(SendCtx->buffer, SendCtx->length, *SendCtx, SendCtx->flags);
        ApcResult apc_result;
        apc_result.code = async_result.code;
        apc_result.transeferred = async_result.transferred;
        apc_result.error = WSAGetLastError();
        SendCtx->notifier.set_value(apc_result);
        if (apc_result.code != async_status_code::Error)
            SendCtx.release();
    }
}

проблема в том, что иногда QueuUserApc не дает сбоя, ноподпрограмма apc не запускается, поэтому поток эмитента блокирует ожидание с будущим :: get

Я проверил GetЗначение LasError () каждый раз после вызова QueuUserApc и всегда равно 0, так что здесь не так?

...