TCP-сервер, который может обрабатывать два разных запроса на запись клиента, не блокируя друг друга - PullRequest
0 голосов
/ 14 февраля 2019

Я пытаюсь написать TCP-сервер, который может обрабатывать двух разных клиентов.У меня есть запросчик и поставщик клиента.Поставщик является многопоточным и добавляет и удаляет новые службы на сервере.Каждый раз, когда новая служба добавляется или удаляется, она должна отправлять ее на сервер, и сервер будет печатать обновление.Клиент-запросчик позволяет пользователю войти в службу, а затем проверяет сервер на наличие службы.

Проблема, с которой я сталкиваюсь, связана с функцией recv().Я вызываю его дважды в моей программе, один раз для чтения из клиента-производителя, а другой для чтения из запросчика.Проблема в том, что сервер получает только одно сообщение, а затем зависает.Он должен обновляться каждый раз при запуске потоков.Кажется, проблема возникает из-за того, что второй recv() вызов блокирует его, потому что он ожидает запроса.Я попытался сделать второй вызов recv() неблокирующим, используя флаг события неблокирования (MSG_DONTWAIT), но это не решило мою проблему.

Как написать сервер TCP, который может обрабатывать два разных запроса записи клиентаи мешать им блокировать друг друга?Мой код ниже.

Клиент 1 - поставщик

#include <stdio.h> 
#include <string.h> 
#include <stdlib.h>  
#include <unistd.h>
#include <time.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <stdbool.h>
#include <pthread.h>

#define PORT 8080

int SIZE = 100;
int counter = 0;
int semaphore = 1; //set to false

struct values 
       {
       int serviceArray[100]; 
       int portArray[100];  
       }input;

void callServer()
{
struct sockaddr_in address; 
    int sock = 0, valread; 
    struct sockaddr_in serv_addr; 

    if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) 
    { 
        printf("\n Socket creation error \n"); 

    } 

    memset(&serv_addr, '0', sizeof(serv_addr)); 

    serv_addr.sin_family = AF_INET; 
    serv_addr.sin_port = htons(PORT); 

    // Convert IPv4 and IPv6 addresses from text to binary form 
    if(inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr)<=0)  
    { 
        printf("\nInvalid address/ Address not supported \n"); 

    } 

    if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) 
    { 
        printf("\nConnection Failed \n"); 

    } 
    send(sock , &input , sizeof(input) , 0 ); 


} 






int arraySearch(int number){
    int i=0; 
    for(i=0; i< 100; i++){
        if(input.serviceArray[i] == number)
        {
            return 0; //is found
        }

    }

    return 1;
}


void *createService()
{

    while(counter < SIZE)
    {
        sleep(2);
        if(semaphore ==1)
        {

            int randomValue = rand() % SIZE;

            if(arraySearch(randomValue) == 1)
            {
                input.serviceArray[counter] = randomValue;
                input.portArray[counter] = randomValue + PORT;
                printf("Thread 1 is adding service number: %d and port number: %d\n", input.serviceArray[counter], input.portArray[counter]);
                semaphore = 0;//unlock
                counter = counter + 1;
                callServer();

            }
        }
    }
}
void *removeService()
{

    while(counter < SIZE)
    {
        sleep(4);
        if(semaphore ==0)
        {
            printf("Thread 2 is removing service number: %d and port number: %d\n", input.serviceArray[counter - 1], input.portArray[counter - 1]);
            input.serviceArray[counter -1] = 0;
            input.portArray[counter - 1] = 0;
            semaphore = 1; //lock
            callServer();

        }
    }
}



int main(void)
{

    //create threads
        pthread_t thread_id1, thread_id2;  


        pthread_create(&thread_id1, NULL, createService, NULL);
        pthread_create(&thread_id2, NULL, removeService, NULL); 


        pthread_join(thread_id1, NULL);  
        pthread_join(thread_id2, NULL); 



}

Клиент 2 - запросчик

#include <stdio.h> 
#include <string.h> 
#include <stdlib.h>  
#include <unistd.h>
#include <time.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <stdbool.h>
#include <pthread.h>
#define PORT 8080

void callServer(int serviceNum)
{
struct sockaddr_in address; 
    int sock = 0, valread; 
    struct sockaddr_in serv_addr; 

    if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) 
    { 
        printf("\n Socket creation error \n"); 

    } 

    memset(&serv_addr, '0', sizeof(serv_addr)); 

    serv_addr.sin_family = AF_INET; 
    serv_addr.sin_port = htons(PORT); 

    // Convert IPv4 and IPv6 addresses from text to binary form 
    if(inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr)<=0)  
    { 
        printf("\nInvalid address/ Address not supported \n"); 

    } 

    if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) 
    { 
        printf("\nConnection Failed \n"); 

    } 
    send(sock , &serviceNum , sizeof(serviceNum) , 0 ); 


} 


void main(){
int serviceNum;

printf("Which service would you like to run?\n");
scanf("%d",&serviceNum);
printf("You entered: %d", serviceNum);
callServer(serviceNum);

}

Сервер

#include <stdio.h> 
#include <string.h> 
#include <stdlib.h>  
#include <unistd.h>
#include <time.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <stdbool.h>
#include <pthread.h>

#define PORT 8080 

 int sockfd; 
 int serviceNum;

    struct sockaddr_in servaddr, cliaddr; 

struct values {
       int serviceArray[100]; 
       int portArray[100];
       }input;






int main()
{

    int server_fd, server_fd2, new_socket, valread, valread2; 

    struct sockaddr_in address; 

    int opt = 1; 

    int addrlen = sizeof(address); 





    // Creating socket file descriptor 

    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 



    // Forcefully attaching socket to the port 8080 

    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | 0, 

                                                  &opt, sizeof(opt))) 

    { 

        perror("setsockopt"); 

        exit(EXIT_FAILURE); 

    } 

    address.sin_family = AF_INET; 

    address.sin_addr.s_addr = INADDR_ANY; 

    address.sin_port = htons( PORT ); 



    // Forcefully attaching socket to the port 8080 

    if (bind(server_fd, (struct sockaddr *)&address,  

                                 sizeof(address))<0) 

    { 

        perror("bind failed"); 

        exit(EXIT_FAILURE); 

    } 

    if (listen(server_fd, 3) < 0) 

    { 

        perror("listen"); 

        exit(EXIT_FAILURE); 

    } 

    if ((new_socket = accept(server_fd, (struct sockaddr *)&address,  

                       (socklen_t*)&addrlen))<0) 

    { 

        perror("accept"); 

        exit(EXIT_FAILURE); 

    } 



        if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 


while(1){
   valread = recv( new_socket , &input, sizeof(input), 0); 




   int j;
        for(j = 0; j < 5; j++)
        {
            printf("service: %d and port: %d\n", input.serviceArray[j], input.portArray[j]);

        }

recv( new_socket , &serviceNum, sizeof(serviceNum), MSG_DONTWAIT)

    printf("The service number you passed is %d", serviceNum);

    }


} 

РЕДАКТИРОВАТЬ - Этоявляется обновлением для сервера и является многопоточным.У меня все еще есть проблемы с блокировкой.

#include <stdio.h> 
#include <string.h> 
#include <stdlib.h>  
#include <unistd.h>
#include <time.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <stdbool.h>
#include <pthread.h>

#define PORT 8080 

 int sockfd; 
 int serviceNum;

    struct sockaddr_in servaddr, cliaddr; 

struct values {
       int serviceArray[100]; 
       int portArray[100];
       }input;

int server_fd, server_fd2, new_socket, valread, valread2; 

    struct sockaddr_in address; 

    int opt = 1; 

    int addrlen = sizeof(address);     



void *producer()
{
     // Creating socket file descriptor 

    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 



    // Forcefully attaching socket to the port 8080 

    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | 0, 

                                                  &opt, sizeof(opt))) 

    { 

        perror("setsockopt"); 

        exit(EXIT_FAILURE); 

    } 

    address.sin_family = AF_INET; 

    address.sin_addr.s_addr = INADDR_ANY; 

    address.sin_port = htons( PORT ); 



    // Forcefully attaching socket to the port 8080 

    if (bind(server_fd, (struct sockaddr *)&address,  

                                 sizeof(address))<0) 

    { 

        perror("bind failed"); 

        exit(EXIT_FAILURE); 

    } 

    if (listen(server_fd, 3) < 0) 

    { 

        perror("listen"); 

        exit(EXIT_FAILURE); 

    } 

    if ((new_socket = accept(server_fd, (struct sockaddr *)&address,  

                       (socklen_t*)&addrlen))<0) 

    { 

        perror("accept"); 

        exit(EXIT_FAILURE); 

    } 



        if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 
    while(1)
    {
   valread = read( new_socket , &input, sizeof(input));   
   int j;
        for(j = 0; j < 5; j++)
        {
            printf("service: %d and port: %d\n", input.serviceArray[j], input.portArray[j]);

        }
}
}

void *requestor()
{
     // Creating socket file descriptor 

    if ((server_fd2 = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 



    // Forcefully attaching socket to the port 8080 

    if (setsockopt(server_fd2, SOL_SOCKET, SO_REUSEADDR | 0, 

                                                  &opt, sizeof(opt))) 

    { 

        perror("setsockopt"); 

        exit(EXIT_FAILURE); 

    } 

    address.sin_family = AF_INET; 

    address.sin_addr.s_addr = INADDR_ANY; 

    address.sin_port = htons(8090); 





    if (bind(server_fd2, (struct sockaddr *)&address,  

                                 sizeof(address))<0) 

    { 

        perror("bind failed"); 

        exit(EXIT_FAILURE); 

    } 

    if (listen(server_fd2, 3) < 0) 

    { 

        perror("listen"); 

        exit(EXIT_FAILURE); 

    } 

    if ((new_socket = accept(server_fd2, (struct sockaddr *)&address,  

                       (socklen_t*)&addrlen))<0) 

    { 

        perror("accept"); 

        exit(EXIT_FAILURE); 

    } 



        if ((server_fd2 = socket(AF_INET, SOCK_STREAM, 0)) == 0) 

    { 

        perror("socket failed"); 

        exit(EXIT_FAILURE); 

    } 
    while(1)
    {
    valread2 = read( new_socket , &serviceNum, sizeof(serviceNum)); 
    printf("The service number you passed is %d", serviceNum);
    }
}


int main()
{

//create threads
        pthread_t thread_id1, thread_id2;  


        pthread_create(&thread_id1, NULL, producer, NULL);
        pthread_create(&thread_id2, NULL, requestor, NULL); 


        pthread_join(thread_id1, NULL);  
        pthread_join(thread_id2, NULL); 







} 

1 Ответ

0 голосов
/ 14 февраля 2019

По умолчанию сокет работает в режиме , блокирующем .Таким образом, имеет смысл, что когда вы вызываете recv() в одном сокете, он будет блокировать другие сокеты в том же потоке, ожидая получения данных.

Для того, что вы пытаетесь сделать, вам нужно изменитькод вашего сервера:

  • оставьте каждый принятый сокет в режиме , блокирующем , и работайте с ними в своих отдельных рабочих потоках или разветвленных процессах.

  • переключите каждый принятый сокет в режим неблокирующий (fctrl(FIONBIO) и т. Д.), А затем используйте select(), (e)poll() или другой аналогичный механизм для контроля сокетоввместе в одном потоке, а затем реагируют на то, когда он сообщает, когда отдельные сокеты имеют активность, которую вам нужно обработать (т. е. не читать из сокета, пока на самом деле не будет доступно что-то для чтения и т. д.).

  • (только для Windows) использует каждый принятый сокет асинхронно с помощью операций перекрывающегося ввода-вывода.Пусть ОС уведомит вас, когда на каждом сокете будет активность.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...