Простой неблокирующий сокет - PullRequest
2 голосов
/ 16 июля 2011

Я пытаюсь иметь неблокирующий ввод / вывод между сервером и клиентом. После того, как два подключены, я пытаюсь использовать fork для обработки ввода-вывода, но на стороне сервера возникает ошибка при попытке прочитать «Транспортная конечная точка не подключена», и это происходит дважды (из-за разветвления, который я предполагаю?) ,

Код сервера

//includes taken out

#define PORT "4950"
#define STDIN 0

struct sockaddr name;

void set_nonblock(int socket) {
    int flags;
    flags = fcntl(socket,F_GETFL,0);
    assert(flags != -1);
    fcntl(socket, F_SETFL, flags | O_NONBLOCK);
}


// get sockaddr, IPv4 or IPv6:
void *get_in_addr(struct sockaddr *sa) {
    if (sa->sa_family == AF_INET)
        return &(((struct sockaddr_in*)sa)->sin_addr);

    return &(((struct sockaddr_in6*)sa)->sin6_addr);
}


int main(int agrc, char** argv) {
    int status, sock, adrlen, new_sd;

    struct addrinfo hints;
    struct addrinfo *servinfo;  //will point to the results

    //store the connecting address and size
    struct sockaddr_storage their_addr;
    socklen_t their_addr_size;

    //socket infoS
    memset(&hints, 0, sizeof hints); //make sure the struct is empty
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM; //tcp
    hints.ai_flags = AI_PASSIVE;     //use local-host address

    //get server info, put into servinfo
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) {
        fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status));
        exit(1);
    }

    //make socket
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
    if (sock < 0) {
        printf("\nserver socket failure %m", errno);
        exit(1);
    }

    //allow reuse of port
    int yes=1;
    if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == -1) {
        perror("setsockopt");
        exit(1);
    }

    //unlink and bind
    unlink("127.0.0.1");
    if(bind (sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) {
        printf("\nBind error %m", errno);
        exit(1);
    }

    freeaddrinfo(servinfo);

    //listen
    if(listen(sock, 5) < 0) {
        printf("\nListen error %m", errno);
        exit(1);
    } 
    their_addr_size = sizeof(their_addr);
    //accept
    new_sd = accept(sock, (struct sockaddr*)&their_addr, &their_addr_size);
    if( new_sd < 0) {
        printf("\nAccept error %m", errno);
        exit(1);
    }

    cout<<"\nSuccessful Connection!";

    //set nonblock
    set_nonblock(new_sd);

    char* in = new char[255];
    char* out = new char[255];
    int numSent;
    int numRead;
    pid_t pid;

    fork();
    pid = getpid();

    if(pid == 0) {

        while( !(out[0] == 'q' && out[1] == 'u' && out[2] == 'i' && out[3] == 't') ) {

            fgets(out, 255, stdin);
            numSent = send(sock, out, strlen(out), 0);

            if(numSent < 0) {
                printf("\nError sending %m", errno);
                exit(1);
            }   //end error
        }   //end while
    }   //end child

    else {
        numRead = recv(sock, in, 255, 0);
        if(numRead < 0) {
            printf("\nError reading %m", errno);
            exit(1);
        }   //end error
        else {
            cout<<in;
            for(int i=0;i<255;i++)
                in[i] = '\0';

        }   //end else
    }   //end parent

    cout<<"\n\nExiting normally\n";
    return 0;
}

Код клиента

// включает вывезенные

#define PORT "4950"

struct sockaddr name;

void set_nonblock(int socket) {
    int flags;
    flags = fcntl(socket,F_GETFL,0);
    assert(flags != -1);
    fcntl(socket, F_SETFL, flags | O_NONBLOCK);
}


// get sockaddr, IPv4 or IPv6:
void *get_in_addr(struct sockaddr *sa) {
    if (sa->sa_family == AF_INET)
        return &(((struct sockaddr_in*)sa)->sin_addr);

    return &(((struct sockaddr_in6*)sa)->sin6_addr);
}

int main(int agrc, char** argv) {
    int status, sock, adrlen;

    struct addrinfo hints;
    struct addrinfo *servinfo;  //will point to the results

    memset(&hints, 0, sizeof hints); //make sure the struct is empty
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM; //tcp
    hints.ai_flags = AI_PASSIVE;     //use local-host address

    //get server info, put into servinfo
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) {
        fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status));
        exit(1);
    }

    //make socket
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
    if (sock < 0) {
        printf("\nserver socket failure %m", errno);
        exit(1);
    }

    if(connect(sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) {
        printf("\nclient connection failure %m", errno);
        exit(1);
    }

    cout<<"\nSuccessful connection!";

    //set nonblock
    set_nonblock(sock);

    char* out = new char[255];
    char* in = new char[255];
    int numRead;
    int numSent;
    pid_t pid;

    fork();
    pid = getpid();

    if(pid == 0) {

        while( !(out[0] == 'q' && out[1] == 'u' && out[2] == 'i' && out[3] == 't') ) {

            fgets(out, 255, stdin);
            numSent = send(sock, out, strlen(out), 0);

            if(numSent < 0) {
                printf("\nError sending %m", errno);
                exit(1);
            }   //end error
        }   //end while
    }   //end child process

    else {
        while( !(in[0] == 'q' && in[1] == 'u' && in[2] == 'i' && in[3] == 't') ) {
        numRead = recv(sock, in, 255, 0);
            cout<<in;
            for(int i=0;i<255;i++)
                in[i] = '\0';
        }
    }   //end parent process

    cout<<"\n\nExiting normally\n";
    return 0;
}

Я также пытался использовать потоки для ввода / вывода. Проблема в том, что когда я запускаю программу, она как потоки не возникает. Программа просто запускается с «Успешным подключением», а затем «нормально завершается». Я поместил несколько операторов cout в циклы while (1), и они несколько раз печатали, но по какой-то причине они просто останавливаются. Я не уверен, что это проблема с моими нитями или сокетами. Код (очень похожий на приведенный выше) для этого здесь -

Сервер

//includes taken out

#define PORT "4950"
#define STDIN 0

pthread_t readthread;
pthread_t sendthread;

char* in = new char[255];
char* out = new char[255];
int numSent;
int numRead;

struct sockaddr name;
int sock, new_sd;

void* readThread(void* threadid) {

    while(1) {

        numRead = recv(new_sd, in, 255, 0);

        if(numRead > 0) {
            cout<<"\n"<<in;
            for(int i=0;i<strlen(in);i++)
                in[i] = '\0';
        }   //end if
        else if(numRead < 0) {
            printf("\nError reading %m", errno);
            exit(1);
        }

    }   //end while
}   //END READTHREAD


void* sendThread(void* threadid) {

     while(1) {

        cin.getline(out, 255);

        numSent = send(new_sd, out, 255, 0);

        if(numSent > 0) {
            for(int i=0;i<strlen(out);i++)
                out[i] = '\0';
        }   //end if
        else if(numSent < 0) {
            printf("\nError sending %m", errno);
            exit(1);
        }
    }   //end while
}   //END SENDTHREAD

void set_nonblock(int socket) {
    int flags;
    flags = fcntl(socket,F_GETFL,0);
    assert(flags != -1);
    fcntl(socket, F_SETFL, flags | O_NONBLOCK);
}

// get sockaddr, IPv4 or IPv6:
void *get_in_addr(struct sockaddr *sa) {
    if (sa->sa_family == AF_INET)
        return &(((struct sockaddr_in*)sa)->sin_addr);

    return &(((struct sockaddr_in6*)sa)->sin6_addr);
}

int main(int agrc, char** argv) {
    int status, adrlen;

    struct addrinfo hints;
    struct addrinfo *servinfo;  //will point to the results

    //store the connecting address and size
    struct sockaddr_storage their_addr;
    socklen_t their_addr_size;

    //socket infoS
    memset(&hints, 0, sizeof hints); //make sure the struct is empty
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM; //tcp
    hints.ai_flags = AI_PASSIVE;     //use local-host address

    //get server info, put into servinfo
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) {
        fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status));
        exit(1);
    }

    //make socket
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
    if (sock < 0) {
        printf("\nserver socket failure %m", errno);
        exit(1);
    }

    //allow reuse of port
    int yes=1;
    if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == -1) {
        perror("setsockopt");
        exit(1);
    }

    //unlink and bind
    unlink("127.0.0.1");
    if(bind (sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) {
        printf("\nBind error %m", errno);
        exit(1);
    }

    freeaddrinfo(servinfo);

    //listen
    if(listen(sock, 5) < 0) {
        printf("\nListen error %m", errno);
        exit(1);
    }

    their_addr_size = sizeof(their_addr);
    //accept
    new_sd = accept(sock, (struct sockaddr*)&their_addr, &their_addr_size);
    if( new_sd < 0) {
        printf("\nAccept error %m", errno);
        exit(1);
    }  

    cout<<"\nSuccessful Connection!";

    //set nonblock
    set_nonblock(new_sd);

    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    pthread_create(&readthread, &attr, readThread, (void*)0);
    pthread_create(&sendthread, &attr, sendThread, (void*)1);

    cout<<"\n\nExiting normally\n";
    return 0;
}

Клиент

#define PORT "4950"

pthread_t readthread;
pthread_t sendthread;
char* in = new char[255];
char* out = new char[255];
int numSent;
int numRead;
struct sockaddr name;
int sock;

void* readThread(void* threadid) {

    while(1) {

        numRead = recv(sock, in, 255, 0);

        if(numRead > 0) {
            cout<<"\n"<<in;
            for(int i=0;i<strlen(in);i++)
                in[i] = '\0';
        }   //end if
        else if(numRead < 0) {
            printf("\nError reading %m", errno);
            exit(1);
        }
    }   //end while
}   //END READTHREAD

void* sendThread(void* threadid) {

    while(1) {

        cin.getline(out, 255);
        numSent = send(sock, out, 255, 0);

        if(numSent > 0) {
            for(int i=0;i<strlen(out);i++)
                out[i] = '\0';
        }   //end if
        else if(numSent < 0) {
            printf("\nError sending %m", errno);
            exit(1);
        }

    }   //end while
}   //END SENDTHREAD

void set_nonblock(int socket) {
    int flags;
    flags = fcntl(socket,F_GETFL,0);
    assert(flags != -1);
    fcntl(socket, F_SETFL, flags | O_NONBLOCK);
}


// get sockaddr, IPv4 or IPv6:
void *get_in_addr(struct sockaddr *sa) {
    if (sa->sa_family == AF_INET)
        return &(((struct sockaddr_in*)sa)->sin_addr);

    return &(((struct sockaddr_in6*)sa)->sin6_addr);
}

int main(int agrc, char** argv) {
    int status, adrlen;

    struct addrinfo hints;
    struct addrinfo *servinfo;  //will point to the results

    memset(&hints, 0, sizeof hints); //make sure the struct is empty
    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM; //tcp
    hints.ai_flags = AI_PASSIVE;     //use local-host address

    //get server info, put into servinfo
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) {
        fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status));
        exit(1);
    }

    //make socket
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
    if (sock < 0) {
        printf("\nserver socket failure %m", errno);
        exit(1);
    }

    if(connect(sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) {
        printf("\nclient connection failure %m", errno);
        exit(1);
    }

    cout<<"\nSuccessful connection!";

    //set nonblock
    set_nonblock(sock);

    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);

    pthread_create(&readthread, &attr, readThread, (void*)0);
    pthread_create(&sendthread, &attr, sendThread, (void*)1);

    cout<<"\n\nExiting normally\n";
    return 0;
}

Я прошу прощения за длинный пост, но я занимался этим уже несколько дней и не уверен, как поступить. Я попытался использовать select (), но это кажется немного для одного клиента и серверного ввода-вывода. Если кто-то может указать на то, что может идти не так, как указано выше, или на любые другие советы (или я просто ошибаюсь в выборе :)), я был бы очень признателен.

Ответы [ 3 ]

4 голосов
/ 16 июля 2011

Я думаю, что ваша проблема здесь:

numSent = send(sock, out, strlen(out), 0);

и здесь:

numRead = recv(sock, in, 255, 0);

, где вы пытаетесь набрать send и recv на вашем сокете прослушивания. Вам нужно использовать new_sd, который является допустимым сокетом. См. Справочную страницу для accept (2) .

2 голосов
/ 16 июля 2011

Прошло много времени с тех пор, как я занимался программированием на POSIX, но следующий код кажется немного странным:

fork();
pid = getpid();
if(pid == 0) {

IIRC, вы должны проверить возвращаемое значение fork: 0 длядочерний процесс, pid child в родительском процессе.

Не рассматривал остальную часть вашего кода:)

1 голос
/ 16 июля 2011

Я посмотрел на первые два кода сервера и клиента. К сожалению, вы разработали свою программу по ошибке. Предполагается, что сервер выполняет следующие действия (в случае нескольких процессов):

  1. Создать сокет
  2. Привязка к номеру порта
  3. Переход в пассивный режим (с помощью функции listen ()).
  4. Перейти в цикл, чтобы принять входящее соединение. Здесь sock отвечает ТОЛЬКО за прослушивание нового соединения и не отвечает за отправку / запись, как вы.
    • принять новое соединение. принимаем ()
    • создать новый процесс
    • Новый процесс отвечает за отправку / запись. Родительский процесс должен вернуться к шагу 4 для прослушивания нового соединения.

Вот псевдокод

while (1) {
    new_sd = accept(....);
    if (new_sd < 0) continue; // or quit
    if (fork() == 0) {
         // do sendin and receiving USING new_sd
         exit(0);  // child terminates here
    }
}

ОБНОВЛЕНИЕ: поскольку вы ищете неблокирующий сценарий, я бы порекомендовал вам использовать системные вызовы select () или poll ().

Мне кажется, что вы хотите создать два процесса, один из которых предназначен для отправки, а другой - для получения. Если это так, то вам не нужно устанавливать new_sd в неблокирующий режим, так как эти два процесса работают одновременно. Если вы планируете это сделать, то дочерний процесс создаст другой процесс, так что первый ребенок отправляет, а второй - получает. Следующим образом:

while (1) {
    new_sd = accept(....);
    if (new_sd < 0) continue; // or quit
    if (fork() == 0) {
         pid = fork();
         if (pid == 0) then do_sending(new_sd);
         else do_receiving(new_sd);
         exit(0);  // child terminates here
    }
}
...