Я работаю над простым клиентом асинхронного сокета, построенным поверх winsock с портами завершения io, иногда он работает, а иногда блокирует со странным поведением
это дизайн асинхронного процесса:
- существуют глобальные потоки iocp, работающие для получения пакетов завершения и уведомления отправителей
- отправители используют процедуру QueueUserApc для выполнения вызова io в контексте одного из ожидающих потоков
- подпрограмма apc выполняет асинхронный вызов io, и поток официанта должен получить результат и уведомить об этом эмитента
Это моя простая оболочка для портов завершения io
template <class T, auto CloseFn>
struct HandleDeleter
{
using pointer = T;
void operator ()(pointer handle) { CloseFn(handle); }
};
using Handle = std::unique_ptr<void, HandleDeleter<HANDLE, CloseHandle>>;
struct IoContext : public OVERLAPPED
{
IoContext()
{
memset(this, 0, sizeof(OVERLAPPED));
}
};
struct CompletionResult
{
CompletionResult() : BytesTransferred(0), Key(0), Ctx(nullptr) {}
CompletionResult(unsigned long bytes, uintptr_t key, IoContext* ctx) : BytesTransferred(bytes), Key(key), Ctx(ctx) {}
unsigned long BytesTransferred;
uintptr_t Key;
IoContext *Ctx;
};
class IoPort
{
Handle port_handle;
public:
IoPort()
{}
IoPort(Handle&& port) : port_handle(std::move(port))
{}
IoPort(IoPort&& rhs) : port_handle(std::move(rhs.port_handle))
{}
IoPort& operator=(IoPort&& rhs)
{
port_handle = std::move(rhs.port_handle);
return *this;
}
bool Create(unsigned int max_threads = 0);
bool AddToPort(Handle& file, uintptr_t key);
bool AddToPort(SOCKET sock, uintptr_t key);
bool GetResultOrApc(CompletionResult& result, DWORD WaitTimeMs);
void Close()
{
port_handle.reset();
}
};
bool IoPort::Create(unsigned int max_threads)
{
port_handle.reset(CreateIoCompletionPort(INVALID_HANDLE_VALUE, nullptr, 0, max_threads));
return bool(port_handle);
}
bool IoPort::AddToPort(Handle & file, uintptr_t key)
{
return CreateIoCompletionPort(file.get(), port_handle.get(), key, 0) != nullptr;
}
bool IoPort::AddToPort(SOCKET sock, uintptr_t key)
{
return CreateIoCompletionPort((HANDLE)sock, port_handle.get(), key, 0) != nullptr;
}
bool IoPort::GetResultOrApc(CompletionResult & result, DWORD WaitTimeMs)
{
OVERLAPPED_ENTRY entry = { 0 };
DWORD count = 0;
BOOL wait_result = GetQueuedCompletionStatusEx(port_handle.get(), &entry, 1, &count, WaitTimeMs, TRUE);
if (entry.lpOverlapped)
{
result.Key = entry.lpCompletionKey;
result.BytesTransferred = entry.dwNumberOfBytesTransferred;
result.Ctx = (IoContext*)entry.lpOverlapped;
}
return wait_result;
}
тогда это глобальные потоки iocp, которые запускаются перед входом в основную функцию
struct IOCPThreads
{
iocp::IoPort Port;
std::vector<std::thread> IoThreads;
bool Exit = false;
IOCPThreads()
{
if (!Port.Create())
std::cout << "[!] failed to open the port , error : " << GetLastError() << std::endl;
auto num = std::thread::hardware_concurrency();
IoThreads.reserve(num);
std::stringstream thd_ids;
thd_ids << "threads ids : { ";
for (auto i : range(0u, num))
{
IoThreads.emplace_back(std::thread(&IOCPThreads::IoThread, this));
thd_ids << IoThreads[i].get_id() << ", ";
}
auto str = thd_ids.str();
str.pop_back();
str.pop_back();
str += " }";
std::cout << str << std::endl;
}
~IOCPThreads()
{
Exit = true;
for (auto& thd : IoThreads)
if (thd.joinable())
thd.join();
}
void IoThread()
{
while (!Exit)
{
iocp::CompletionResult result;
Port.GetResultOrApc(result, 10);
if (GetLastError() == WAIT_IO_COMPLETION)
std::cout << "[*] received an apc" << std::endl; // sometimes print and sometimes no
if (!result.Ctx)
continue;
DWORD transeferred, Flags;
std::unique_ptr<ClientIoCtx> ctx((ClientIoCtx*)result.Ctx);
WSAGetOverlappedResult(ctx->sock->GetHandle().get(), ctx.get(), &transeferred, FALSE, &Flags);
int WsaError = WSAGetLastError();
if (ctx->op == ClientIoOp::Recv)
{
std::unique_ptr<ClientIoRecvCtx> RecvCtx = unique_static_cast<ClientIoRecvCtx>(ctx);
RecvCtx->OnResult(*RecvCtx->sock, WsaError, std::span{ RecvCtx->buffer, RecvCtx->buffer + RecvCtx->length }, transeferred);
}
}
}
};
IOCPThreads ClientIoThreads;
, и это реализация сокета:
enum class async_status_code : unsigned int
{
Pending,
Finished,
Error,
};
struct AsyncResult
{
async_status_code code;
unsigned int transferred;
AsyncResult() : code(async_status_code::Error), transferred(0)
{}
AsyncResult(async_status_code c, unsigned int s) : code(c), transferred(s)
{}
bool IsPending() const
{
return code == async_status_code::Pending;
}
bool IsFinished() const
{
return code == async_status_code::Finished;
}
bool IsFailed() const
{
return code == async_status_code::Error;
}
};
using AsyncSendFunc = std::function<void(Socket& sock, int ErrorCode, std::span<const char> Buffer, DWORD transeferred)>;
enum class SendRecvFlags
{
UnSpecefied = 0,
MsgDontRoute = 0x4,
MsgOOB = 0x1,
MsgPeek = 0x2,
MsgWaitAll = 0x8,
MsgPushImmediate = 0x20,
};
enum class ClientIoOp
{
Send,
};
struct ApcResult
{
async_status_code code;
DWORD transeferred;
int error;
};
struct ClientIoCtx : public IoContext
{
Socket *sock;
ClientIoOp op;
promise<ApcResult> notifier;
};
struct ClientIoSendCtx : public ClientIoCtx
{
ClientIoSendCtx() { op = ClientIoOp::Send; }
const char *buffer;
int length;
SendRecvFlags flags;
AsyncSendFunc OnResult;
};
AsyncResult LIB_NAMESPACE::net::Socket::AsyncSend(const char * buffer, int length, io::IoContext & IoCtx, SendRecvFlags flags)
{
WSABUF buff_vec{ length, (char*)buffer };
DWORD sent = 0;
if (WSASend(sock.get(), &buff_vec, 1, &sent, (DWORD)flags, &IoCtx, nullptr) == 0)
return AsyncResult(async_status_code::Finished, sent);
else if (WSAGetLastError() == WSA_IO_PENDING)
return AsyncResult(async_status_code::Pending, sent);
return AsyncResult();
}
AsyncResult LIB_NAMESPACE::net::Socket::AsyncSend(const char * buffer, int length, const AsyncSendFunc & OnSend, SendRecvFlags flags)
{
ClientIoSendCtx *ctx = new ClientIoSendCtx();
ctx->sock = this;
ctx->buffer = buffer;
ctx->length = length;
ctx->flags = flags;
ctx->OnResult = OnSend;
auto fut = ctx->notifier.get_future(); // notifier is a promise
auto& thd = ClientIoThreads.IoThreads[RandomUInt(0, ClientIoThreads.IoThreads.size() - 1)];
std::cout << "[*] launchin the apc routine in thread : " << thd.get_id() << std::endl; // it's always one of the io threads
auto dwRes = QueueUserAPC(Socket::ApcRoutine, thd.native_handle(), (ULONG_PTR)ctx);
if (!dwRes) // doesn't happen
{
delete ctx;
std::cout << "QueueUserAPC failed with error " << GetLastError() << std::endl;
return AsyncResult();
}
auto result = fut.get(); // --> blocks here because the apc routine doesn't start
WSASetLastError(result.error); // the wsa error is thread local so I transfer it from the apc thread to this thread
return AsyncResult(result.code, result.transeferred);
}
void ApcRoutine(unsigned long long Param)
{
std::cout << "[*] ApcRoutine started" << std::endl; // sometimes is printed and sometimes no
std::unique_ptr<ClientIoCtx> ctx((ClientIoCtx*)Param);
if (ctx->op == ClientIoOp::Send)
{
std::unique_ptr<ClientIoSendCtx> SendCtx = unique_static_cast<ClientIoSendCtx>(ctx);
auto async_result = SendCtx->sock->AsyncSend(SendCtx->buffer, SendCtx->length, *SendCtx, SendCtx->flags);
ApcResult apc_result;
apc_result.code = async_result.code;
apc_result.transeferred = async_result.transferred;
apc_result.error = WSAGetLastError();
SendCtx->notifier.set_value(apc_result);
if (apc_result.code != async_status_code::Error)
SendCtx.release();
}
}
проблема в том, что иногда QueuUserApc не дает сбоя, ноподпрограмма apc не запускается, поэтому поток эмитента блокирует ожидание с будущим :: get
Я проверил GetЗначение LasError () каждый раз после вызова QueuUserApc и всегда равно 0, так что здесь не так?