несколько потоков могут получать стадо одновременно - PullRequest
10 голосов
/ 27 февраля 2012

У меня сложилось впечатление, что flock (2) потокобезопасен, я недавно натолкнулся на случай в коде, где несколько потоков могут получить блокировку для одного файла, которые все синхронизируется с использованием получения эксклюзивной блокировки с помощью c api flock. Процесс 25554 является многопоточным приложением, имеющим 20 потоков, количество потоков, имеющих блокировку для одного и того же файла, зависит от взаимоблокировки. Многопоточное приложение testEvent записывает данные в файл, где была выдвинута программа чтения из файла. К сожалению, lsof не печатает значение LWP, поэтому я не могу найти, какие потоки удерживают блокировку. Когда возникает указанное ниже условие, процесс и потоки застревают в вызове flock, как показано вызовом pstack или strace для pid 25569 и 25554. Любые предложения о том, как преодолеть это в RHEL 4.x.

Одна вещь, которую я хотел обновить, это то, что flock не ведет себя неправильно все время, когда скорость передачи сообщений составляет более 2 Мбит / с только тогда, когда я вхожу в эту проблему взаимоблокировки с flock, ниже этой скорости передачи все файлы. Я сохранил константу num_threads = 20, size_of_msg = 1000 байтов и просто изменил количество сообщений tx в секунду, начиная с 10 сообщений до 100 сообщений, что составляет 20 *1000* 100 = 2 Мбит / с, когда я увеличиваю количество сообщения до 150, тогда стая проблема происходит.

Я просто хотел спросить, что ты думаешь о flockfile c api.

 sudo lsof filename.txt
    COMMAND       PID     USER     FD       TYPE     DEVICE     SIZE   NODE       NAME
    push         25569    root     11u       REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     27uW      REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     28uW      REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     29uW      REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     30uW      REG      253.4      1079   49266853   filename.txt

Многопоточная тестовая программа, которая будет вызывать функцию write_data_lib_func lib.

void* sendMessage(void *arg)  {

int* numOfMessagesPerSecond = (int*) arg;
std::cout <<" Executing p thread id " << pthread_self() << std::endl;
 while(!terminateTest) {
   Record *er1 = Record::create();
   er1.setDate("some data");

   for(int i = 0 ; i <=*numOfMessagesPerSecond ; i++){
     ec = _write_data_lib_func(*er1);
     if( ec != SUCCESS) {
       std::cout << "write was not successful" << std::endl;

     }

   }
   delete er1;
   sleep(1);
 }

 return NULL;

Вышеуказанный метод будет вызываться в pthreads в основной функции теста.

for (i=0; i<_numThreads ; ++i) {
  rc = pthread_create(&threads[i], NULL, sendMessage, (void *)&_num_msgs);
  assert(0 == rc);

}

Вот источник писателя / читателя. Из-за проприетарных причин, которые я не хотел просто вырезать и вставлять, источник писателя получит доступ к нескольким потокам в процессе

int write_data_lib_func(Record * rec) {      
if(fd == -1 ) {  
    fd = open(fn,O_RDWR| O_CREAT | O_APPEND, 0666);
} 
if ( fd >= 0 ) {
   /* some code */ 

   if( flock(fd, LOCK_EX) < 0 ) {
     print "some error message";
   }
   else { 
    if( maxfilesize) {
      off_t len = lseek ( fd,0,SEEK_END);
      ...
      ... 
      ftruncate( fd,0);
      ...
      lseek(fd,0,SEEK_SET); 
   } /* end of max spool size */ 
   if( writev(fd,rec) < 0 ) {
     print "some error message" ; 
   }

   if(flock(fd,LOCK_UN) < 0 ) {
   print some error message; 
   } 

На стороне читателя есть процесс-демон без потоков.

int readData() {
    while(true) {
      if( fd == -1 ) {
         fd= open (filename,O_RDWR);
      }
      if( flock (fd, LOCK_EX) < 0 ) { 
        print "some error message"; 
        break; 
      } 
      if( n = read(fd,readBuf,readBufSize)) < 0 ) { 
        print "some error message" ;
        break;
      }  
      if( off < n ) { 
        if ( off <= 0 && n > 0 ) { 
          corrupt_file = true; 
        } 
        if ( lseek(fd, off-n, SEEK_CUR) < 0 ) { 
          print "some error message"; 
        } 
        if( corrupt_spool ) {  
          if (ftruncate(fd,0) < 0 ) { 
             print "some error message";
             break;
           }  
        }
      }
      if( flock(fd, LOCK_UN) < 0 ) 
       print some error message ;
      }  
   }     
}

Ответы [ 2 ]

9 голосов
/ 27 февраля 2012

flock (2) задокументировано как «блокировка, если несовместимая блокировка удерживается другим процессом » и с «блокировками, созданными функцией flock (), связанными с записью таблицы открытых файлов», поэтому следует ожидать, что блокировки flock с несколькими потоками одного и того же процесса не взаимодействуют. (flock документация не затрагивает темы).

Следовательно, решение должно быть простым для вас: свяжите один pthread_mutex_t с каждым flock -без дескриптором файла и защитите вызов flock с этим мьютексом. Вы также можете использовать pthread_rwlock_t , если хотите заблокировать чтение или запись.

2 голосов
/ 28 февраля 2012

На справочной странице Linux для flock (2):

Блокировки, созданные flock (), связаны с записью таблицы открытых файлов.Это означает, что дубликаты файловых дескрипторов (созданные, например, с помощью fork (2) или dup (2)) ссылаются на одну и ту же блокировку, и эта блокировка может быть изменена или снята с использованием любого из этих дескрипторов.Кроме того, блокировка снимается либо явной операцией LOCK_UN на любом из этих дублирующих дескрипторов, либо когда все такие дескрипторы закрыты.

Кроме того, блокировки стада не «складываются»,если вы попытаетесь получить блокировку, которую вы уже держите, вызов flock - это noop, который сразу же возвращается без блокировки и без изменения состояния блокировки.

Поскольку потоки в дескрипторах файлов общего процесса разделяются, вы можете собиратьсяфайл несколько раз из разных потоков, и он не будет блокироваться, так как блокировка уже удержана.

Также из заметок на flock (2):

flock ()и блокировки fcntl (2) имеют различную семантику по отношению к разветвленным процессам и dup (2).В системах, которые реализуют flock () с использованием fcntl (2), семантика flock () будет отличаться от описанной на этой странице руководства.

...