Циклические потоки обращаются к pthread mutex - PullRequest
0 голосов
/ 23 февраля 2010

я создаю это приложение, в котором у меня есть клиент, представленный потоком, работающий в цикле (пока не получит инструкцию на завершение), пытающийся получить доступ к критической секции данных на сервере.

Когда первый клиент подключается к серверу, он владеет блокировкой этого мьютекса. все последующие соединения удерживаются. Это «нормальная» часть этого.

Но затем, когда первый поток разблокируется, цикл возвращает его в начало, и он должен снова бороться за блокировку. Но предполагается, что он все еще удерживает блокировку и выполняет критическую секцию в почти бесконечном цикле (не бесконечном, потому что мы можем завершить поток, отдавая блокировку для других потоков).

Чтобы возобновить все это ... когда первый клиент подключается, он владеет блокировкой навсегда. Другие потоки остаются в очереди, пока не завершится первый поток. Вот некоторый код:

Сервер:

int main(int argc, char * argv[])
{
    int servSock;     
    unsigned short servPort;  
    unsigned int clntLen;

    rcvBuf = (char *)malloc((MAXLINE)*sizeof(char));
    pthreads = (fifo_t*)malloc(sizeof(fifo_t));

    struct sockaddr_in servAddr; 
    struct sockaddr_in clntAddr; 

    pthread_attr_t custom_sched_attr;   

    int fifo_max_prio, fifo_min_prio, fifo_mid_prio;   
    struct sched_param fifo_param;    

    pthread_attr_init(&custom_sched_attr);   
    pthread_attr_setinheritsched(&custom_sched_attr, PTHREAD_EXPLICIT_SCHED);   
    pthread_attr_setschedpolicy(&custom_sched_attr, SCHED_FIFO);

    fifo_max_prio = sched_get_priority_max(SCHED_FIFO);   
    fifo_min_prio = sched_get_priority_min(SCHED_FIFO);   
    fifo_mid_prio = (fifo_min_prio + fifo_max_prio)/2;   

    fifo_param.sched_priority = fifo_mid_prio;  

    pthread_attr_setschedparam(&custom_sched_attr, &fifo_param);

    if(argc !=2 ){
        fprintf(stderr,"Usage: %s <Server Port>\n",argv[0]);
        exit(1);
    }

    fifo_init(pthreads);

    db  = db_open("DB", O_RDWR, 0666);
    servPort = atoi(argv[1]);

    if((servSock = socket(AF_INET,SOCK_STREAM,0)) < 0){ 
        perror("Error with Socket()");
        exit(1);
    }

    memset(&servAddr,0,sizeof(servAddr));
    servAddr.sin_family = AF_INET;
    servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servAddr.sin_port = htons(servPort);

    if(bind(servSock,(struct sockaddr*)&servAddr,sizeof(servAddr)) < 0){
        perror("Error with bind()");
        exit(1);
    }

    if(listen(servSock,NUM_THREADS) < 0){
        perror("Error with listen()");
        exit(1);
    }

    for(;;)
    { 
        printf("A estabelecer ligaçao!\n");
        clntLen = sizeof(clntAddr);

        if((clntSock = accept(servSock,(struct sockaddr*)&clntAddr,&clntLen)) < 0)
        {
            perror("Error with accept()");
            exit(0);
        }

        printf("Ligação estabelecida!\n");

        pthread_create(&thread,&custom_sched_attr,&HandleTcpClient,(void *)clntSock);

        printf("Continua a execução nesta thread: %d\n",(int)pthread_self());
   }

   exit(0);
}

void * HandleTcpClient(void * data){

   insert(pthreads,(int)pthread_self());

   int sock = (int)data;

   char * key = (char *)malloc(FIELD * sizeof(char));
   char * dados = (char *)malloc(FIELD * sizeof(char));
   char * vendDev = (char *)malloc(FIELD * sizeof(char));
   char * str = (char*)malloc(MAXLINE * sizeof(char));

   memset(key,0,sizeof(key));
   memset(dados,0,sizeof(dados));
   memset(vendDev,0,sizeof(vendDev));
   memset(str,0,sizeof(str));

   while(1)
   {
   start:

       printf("Sem Lock: %d com socket: %d\n",(int)pthread_self(),sock);

       pthread_mutex_lock(&mutexdb);

       printf("Com Lock: %d com socket: %d\n",(int)pthread_self(),sock);
       int i = 0;
       char op;

       if((recvMsgSize = recv((sock),rcvBuf,MAXLINE,0)) &lt; 0)
       {
           perror("Erro na recepção!\n");
           exit(-1);
       }

       rcvBuf[recvMsgSize]='\0';
       str = (char *)rcvBuf;

       op = *str++;

       while(*str!='|' && *str!=0){
           key[i]=*str;
           str++;
           i++;
       }

       key[++i]='\0';
       str++;

       if((int)op==2 || (int)op==3) strcpy(vendDev,str);  // obter o numero de produtos vendidos/devolvidos
       else strcpy(dados,str);

       if(op == 4 || op == 6)
       {
           db_operate(&db,op,key,dados,sock); 
           pthread_mutex_unlock(&mutexdb);
           printf("Unlock: %d com socket: %d\n",(int)pthread_self(),sock);
           goto start;
       }
       else
           if(op == 7)
           {
               extract(pthreads);
               pthread_mutex_unlock(&mutexdb);
               close(sock);
               pthread_exit(0);
               break;
           }
           else
               if(op == 1 || op == 2 || op == 3 || op == 5)
               {
                   db_search(&db,op,key,vendDev,sock);
                   pthread_mutex_unlock(&mutexdb);
                   printf("Unlock: %d com socket: %d\n",(int)pthread_self(),sock);
                   goto start;
               }
   }

Может кто-нибудь сказать мне, что я здесь не так делаю? Как я могу реализовать политику, при которой, когда первый поток выполняет разблокировку, следующий поток получает его, а указанный первый поток возвращается к ожидающей строке, как в списке FIFO?

Спасибо;)

1 Ответ

4 голосов
/ 23 февраля 2010

Существует ряд проблем с кодом, который был опубликован:

  • Функция потока выделяет память, но никогда не освобождает ее.
  • Вы держите мьютекс поверх блокирующего чтения сокета - это никогда не будет хорошей идеей.Mutex должен защищать ресурс, общий для потоков - база данных в вашем случае.Сокет не является общим, поэтому не нуждается в защите.Хуже того - ожидание сокета и удержание мьютекса не позволяют другим потокам получить доступ к базе данных.
  • На стороне дизайна - мьютекс принадлежит базе данных, а не считывателю сокетов.Я предлагаю добавить мьютекс в структуру базы данных, инициализировать его во время настройки базы данных и скрыть вызовы блокировки / разблокировки внутри функций доступа к базе данных.Определите наименьший возможный критический участок и защитите его.Это дает вам лучший параллелизм.
  • Выполнение потока на сокет работает только для очень небольшого количества соединений.Крупномасштабный дизайн всегда включает неблокирующие розетки и приемы select/poll/epoll/kqueue.

Я не совсем понял часть вопроса FIFO, но я надеюсь, что приведенные выше пункты помогут вам в правильном направлении.

...