MSDN Sync Named Pipes, клиент всегда переподключается - PullRequest
0 голосов
/ 19 октября 2018

Приведенный ниже пример не является рабочим, так как мой код находится в большой кодовой базе, и предоставление рабочего примера невозможно.Я вырезал и вставил раздел ниже и постараюсь объяснить, как они взаимодействуют и в чем заключается моя проблема.

createServerQue () является точкой входа для сервера.Сервер создает входной канал WAIT in -bound для подключения клиента, а затем создает поток, который запускает acceptClientsThread () , который должен ждать, пока клиенты попытаются подключиться, принять свое соединение и выделить отдельноепоток, который будет обрабатывать сообщения.

При запуске напечатанные сообщения журнала указывают, что клиент пытается подключиться к серверу, а поток-получатель сервера просто непрерывно обходит свой цикл «принятия клиентов» и выделения потоков.

Я знаю, что мой код не полностью завершен, но надеюсь, что кто-то может объяснить, в чем заключается мое недопонимание с тем, как работают эти каналы.Он находится в режиме WAIT, поэтому я ожидаю, что поток-акцептор останется заблокированным и создаст поток-получатель только один раз (для каждого клиента).Я в основном скопировал и вставил этот код из примера , найденного в .

Спасибо за помощь, обновлю этот пост, если что-то не понятно.

createServerQue(){
    String pipeName = String();
    pipeName.format("%s%d", MQUEUE_INPUT_PATHNAME, m_ourId);
    m_maxQMsgSize =45000;
    m_ourQueueId = CreateNamedPipe(
        LPCSTR(pipeName.cstr()),
        PIPE_ACCESS_INBOUND, //Since we replicate sysv ipc which is one way pipes
        PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT | PIPE_REJECT_REMOTE_CLIENTS,
        PIPE_UNLIMITED_INSTANCES,
        m_maxQMsgSize,
        m_maxQMsgSize,
        0,
        NULL
    );

    if (m_ourQueueId == INVALID_HANDLE_VALUE)
    {
        LOGE("%s: CreateNamedPipe failed, error: %d.\n", pipeName.cstr(), GetLastError());
        return -1;
    } else {
        LOGI("%d: Our NamedPipe %s was created\n", m_ourId, pipeName.cstr());

        //Kick off a thread that accepts client connection to the pipe
        //This is not neaded in the ARM implementation
        m_acceptingThreadId = CreateThread("IPCClientAcceptor", false, &sAcceptClientsThread, this);
        if (0 == m_acceptingThreadId)
        {
            LOGE("%d: Failed to create client accepting thread\n", m_ourId);
            return -1;
        }
    }
}

acceptClientsThread()
{
    bool run = true;
    while (run) {
        bool fConnected = ConnectNamedPipe(m_ourQueueId, NULL) ?
            TRUE : (GetLastError() == ERROR_PIPE_CONNECTED);

        if (fConnected)
        {
            LOGI("Client connected, creating a processing thread.\n");

            // Create a thread for this client. 
            int clientRcvThread = CreateThread(
                "ClientReciever",
                false,
                sRecvThread,
                this);      // returns thread ID 

            if (clientRcvThread == NULL)
            {
                LOGE("CreateThread failed, GLE=%d.\n", GetLastError());
                return;
            }
            else {
                m_clients_threads.insertTail(new Client(clientRcvThread));
            }
        }
        else if (GetLastError() == ERROR_PIPE_LISTENING) {

        }else{
            // The client could not connect, so close the pipe. 
            CloseHandle(m_ourQueueId);
            LOGE("Client failed to connect to pipe, err: %d , closing\n", GetLastError());
            //m_ourQueueId = INVALID_HANDLE_VALUE;
            //run = false;
        }

        if (m_acceptingThreadId <= 0) { //Signaled to stop
            run = false;
        }
    }
}

sAcceptClientsThread(void* param)
{
    IPC* pThis = (IPC*)param;
    pThis->acceptClientsThread();
}

void recvThread()
{
    LOGI("%d: Receiving messages\n", m_ourId);

    int maxMsgSize = m_maxQMsgSize + sizeof(t_message) - 1;
    uint8_t* buff = new uint8_t[maxMsgSize];

    bool run = true;
    while (run)
    {
        LOGE("%d: Calling ReadFile(), ourId: %d ,msgSize: %d\n", m_ourId, maxMsgSize);
        DWORD msgLen = 0;
        bool fSuccess = ReadFile(
            m_ourQueueId,    // pipe handle 
            buff,    // buffer to receive reply 
            maxMsgSize,  // size of buffer 
            &msgLen,  // number of bytes read 
            NULL);    // not overlapped 

        if (msgLen == -1 || !fSuccess)
        {
            int err = GetLastError();
            LOGE("Client failed to read GLE:%d\n", err);
            run = false;
            continue;
        } else if (msgLen > 0) {
            processInboundMsg(buff, msgLen);
        }

        if (m_acceptingThreadId <= 0) { //Signaled to stop
            run = false;
        }
    }

    delete[] buff;
    LOG("%d: Shutting down reciever\n", m_ourId);
}

void sendToServer(){
    if (m_theirQueueId == INVALID_HANDLE_VALUE) {
        String pipeName = String();
        pipeName.format("%s%d", MQUEUE_INPUT_PATHNAME, m_theirId);
        LOGE("%d: Connecting to client for first time: %s\n", m_ourId ,pipeName.cstr());

        //Sometimes it takes some time for the pipe to become ready, so continue trying to connect untill we get an error or connect
        while (1) {
            m_theirQueueId = CreateFile(
                LPCSTR(pipeName.cstr()),   // pipe name 
                GENERIC_WRITE,
                0,              // no sharing 
                NULL,           // default security attributes
                OPEN_EXISTING,  // opens existing pipe 
                0,              // default attributes 
                NULL);          // no template file 

            if (m_theirQueueId != INVALID_HANDLE_VALUE) {
                LOGE("%d Connected to Clients Pipe %d\n", m_ourId, m_theirId);
                break;
            }

            if (GetLastError() != ERROR_PIPE_BUSY)
            {
                LOGE("Could not open pipe. GLE=%d\n", GetLastError());
                return -1;
            }

            // All pipe instances are busy, so wait for 5 seconds. 

            if (!WaitNamedPipe(LPCSTR(pipeName.cstr()), 5000))
            {
                LOGE("Could not open pipe: 5 second wait timed out.\n");
                return -1;
            }
        }

        DWORD dwMode = PIPE_READMODE_BYTE;
        bool fSuccess = SetNamedPipeHandleState(
            m_theirQueueId,    // pipe handle 
            &dwMode,  // new pipe mode 
            NULL,     // don't set maximum bytes 
            NULL);    // don't set maximum time 
        if (!fSuccess)
        {
            LOGE("SetNamedPipeHandleState failed. GLE=%d\n", GetLastError());
            return -1;
        }

    }

    //Handle should be valid now if it wasnt before
    if (m_theirQueueId != INVALID_HANDLE_VALUE) {
        // Send msg
        LOG("%d: Sending msg %d (%dB) to %d\n", m_ourId, msg->msgId, msg->payloadLen, m_theirId);
        LOGT("%d: Calling msgsnd(%d, %p, %d, IPC_NOWAIT)\n", m_ourId, m_theirQueueId, msg, msgLen);
        DWORD  numWritten = 0;
        bool fSuccess = WriteFile(
            m_theirQueueId,        // handle to pipe 
            msg,     // buffer to write from 
            msgLen, // number of bytes to write 
            &numWritten,   // number of bytes written 
            0);        // not overlapped I/O 
        if (!fSuccess || msgLen != numWritten)
        {
            ret = -1;
            LOGE("%d: Failed to write to client %d\n", m_ourId, GetLastError());
            m_theirId = -1;
            m_theirQueueId = INVALID_HANDLE_VALUE; // We'll re-open the queue on nextmessage
            return ret;
        }
    }
    else {
        LOGE("Handle still not valid\n");
        return -1;
    }
}
...