Приведенный ниже пример не является рабочим, так как мой код находится в большой кодовой базе, и предоставление рабочего примера невозможно.Я вырезал и вставил раздел ниже и постараюсь объяснить, как они взаимодействуют и в чем заключается моя проблема.
createServerQue () является точкой входа для сервера.Сервер создает входной канал WAIT in -bound для подключения клиента, а затем создает поток, который запускает acceptClientsThread () , который должен ждать, пока клиенты попытаются подключиться, принять свое соединение и выделить отдельноепоток, который будет обрабатывать сообщения.
При запуске напечатанные сообщения журнала указывают, что клиент пытается подключиться к серверу, а поток-получатель сервера просто непрерывно обходит свой цикл «принятия клиентов» и выделения потоков.
Я знаю, что мой код не полностью завершен, но надеюсь, что кто-то может объяснить, в чем заключается мое недопонимание с тем, как работают эти каналы.Он находится в режиме WAIT, поэтому я ожидаю, что поток-акцептор останется заблокированным и создаст поток-получатель только один раз (для каждого клиента).Я в основном скопировал и вставил этот код из примера , найденного в .
Спасибо за помощь, обновлю этот пост, если что-то не понятно.
createServerQue(){
String pipeName = String();
pipeName.format("%s%d", MQUEUE_INPUT_PATHNAME, m_ourId);
m_maxQMsgSize =45000;
m_ourQueueId = CreateNamedPipe(
LPCSTR(pipeName.cstr()),
PIPE_ACCESS_INBOUND, //Since we replicate sysv ipc which is one way pipes
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS,
PIPE_UNLIMITED_INSTANCES,
m_maxQMsgSize,
m_maxQMsgSize,
0,
NULL
);
if (m_ourQueueId == INVALID_HANDLE_VALUE)
{
LOGE("%s: CreateNamedPipe failed, error: %d.\n", pipeName.cstr(), GetLastError());
return -1;
} else {
LOGI("%d: Our NamedPipe %s was created\n", m_ourId, pipeName.cstr());
//Kick off a thread that accepts client connection to the pipe
//This is not neaded in the ARM implementation
m_acceptingThreadId = CreateThread("IPCClientAcceptor", false, &sAcceptClientsThread, this);
if (0 == m_acceptingThreadId)
{
LOGE("%d: Failed to create client accepting thread\n", m_ourId);
return -1;
}
}
}
acceptClientsThread()
{
bool run = true;
while (run) {
bool fConnected = ConnectNamedPipe(m_ourQueueId, NULL) ?
TRUE : (GetLastError() == ERROR_PIPE_CONNECTED);
if (fConnected)
{
LOGI("Client connected, creating a processing thread.\n");
// Create a thread for this client.
int clientRcvThread = CreateThread(
"ClientReciever",
false,
sRecvThread,
this); // returns thread ID
if (clientRcvThread == NULL)
{
LOGE("CreateThread failed, GLE=%d.\n", GetLastError());
return;
}
else {
m_clients_threads.insertTail(new Client(clientRcvThread));
}
}
else if (GetLastError() == ERROR_PIPE_LISTENING) {
}else{
// The client could not connect, so close the pipe.
CloseHandle(m_ourQueueId);
LOGE("Client failed to connect to pipe, err: %d , closing\n", GetLastError());
//m_ourQueueId = INVALID_HANDLE_VALUE;
//run = false;
}
if (m_acceptingThreadId <= 0) { //Signaled to stop
run = false;
}
}
}
sAcceptClientsThread(void* param)
{
IPC* pThis = (IPC*)param;
pThis->acceptClientsThread();
}
void recvThread()
{
LOGI("%d: Receiving messages\n", m_ourId);
int maxMsgSize = m_maxQMsgSize + sizeof(t_message) - 1;
uint8_t* buff = new uint8_t[maxMsgSize];
bool run = true;
while (run)
{
LOGE("%d: Calling ReadFile(), ourId: %d ,msgSize: %d\n", m_ourId, maxMsgSize);
DWORD msgLen = 0;
bool fSuccess = ReadFile(
m_ourQueueId, // pipe handle
buff, // buffer to receive reply
maxMsgSize, // size of buffer
&msgLen, // number of bytes read
NULL); // not overlapped
if (msgLen == -1 || !fSuccess)
{
int err = GetLastError();
LOGE("Client failed to read GLE:%d\n", err);
run = false;
continue;
} else if (msgLen > 0) {
processInboundMsg(buff, msgLen);
}
if (m_acceptingThreadId <= 0) { //Signaled to stop
run = false;
}
}
delete[] buff;
LOG("%d: Shutting down reciever\n", m_ourId);
}
void sendToServer(){
if (m_theirQueueId == INVALID_HANDLE_VALUE) {
String pipeName = String();
pipeName.format("%s%d", MQUEUE_INPUT_PATHNAME, m_theirId);
LOGE("%d: Connecting to client for first time: %s\n", m_ourId ,pipeName.cstr());
//Sometimes it takes some time for the pipe to become ready, so continue trying to connect untill we get an error or connect
while (1) {
m_theirQueueId = CreateFile(
LPCSTR(pipeName.cstr()), // pipe name
GENERIC_WRITE,
0, // no sharing
NULL, // default security attributes
OPEN_EXISTING, // opens existing pipe
0, // default attributes
NULL); // no template file
if (m_theirQueueId != INVALID_HANDLE_VALUE) {
LOGE("%d Connected to Clients Pipe %d\n", m_ourId, m_theirId);
break;
}
if (GetLastError() != ERROR_PIPE_BUSY)
{
LOGE("Could not open pipe. GLE=%d\n", GetLastError());
return -1;
}
// All pipe instances are busy, so wait for 5 seconds.
if (!WaitNamedPipe(LPCSTR(pipeName.cstr()), 5000))
{
LOGE("Could not open pipe: 5 second wait timed out.\n");
return -1;
}
}
DWORD dwMode = PIPE_READMODE_BYTE;
bool fSuccess = SetNamedPipeHandleState(
m_theirQueueId, // pipe handle
&dwMode, // new pipe mode
NULL, // don't set maximum bytes
NULL); // don't set maximum time
if (!fSuccess)
{
LOGE("SetNamedPipeHandleState failed. GLE=%d\n", GetLastError());
return -1;
}
}
//Handle should be valid now if it wasnt before
if (m_theirQueueId != INVALID_HANDLE_VALUE) {
// Send msg
LOG("%d: Sending msg %d (%dB) to %d\n", m_ourId, msg->msgId, msg->payloadLen, m_theirId);
LOGT("%d: Calling msgsnd(%d, %p, %d, IPC_NOWAIT)\n", m_ourId, m_theirQueueId, msg, msgLen);
DWORD numWritten = 0;
bool fSuccess = WriteFile(
m_theirQueueId, // handle to pipe
msg, // buffer to write from
msgLen, // number of bytes to write
&numWritten, // number of bytes written
0); // not overlapped I/O
if (!fSuccess || msgLen != numWritten)
{
ret = -1;
LOGE("%d: Failed to write to client %d\n", m_ourId, GetLastError());
m_theirId = -1;
m_theirQueueId = INVALID_HANDLE_VALUE; // We'll re-open the queue on nextmessage
return ret;
}
}
else {
LOGE("Handle still not valid\n");
return -1;
}
}