У меня сложилось впечатление, что flock (2) потокобезопасен, я недавно натолкнулся на случай в коде, где несколько потоков могут получить блокировку для одного файла, которые все синхронизируется с использованием получения эксклюзивной блокировки с помощью c api flock. Процесс 25554 является многопоточным приложением, имеющим 20 потоков, количество потоков, имеющих блокировку для одного и того же файла, зависит от взаимоблокировки. Многопоточное приложение testEvent записывает данные в файл, где была выдвинута программа чтения из файла. К сожалению, lsof
не печатает значение LWP, поэтому я не могу найти, какие потоки удерживают блокировку. Когда возникает указанное ниже условие, процесс и потоки застревают в вызове flock, как показано вызовом pstack
или strace
для pid 25569 и 25554. Любые предложения о том, как преодолеть это в RHEL 4.x.
Одна вещь, которую я хотел обновить, это то, что flock не ведет себя неправильно все время, когда скорость передачи сообщений составляет более 2 Мбит / с только тогда, когда я вхожу в эту проблему взаимоблокировки с flock, ниже этой скорости передачи все файлы. Я сохранил константу num_threads
= 20, size_of_msg
= 1000 байтов и просто изменил количество сообщений tx в секунду, начиная с 10 сообщений до 100 сообщений, что составляет 20 *1000* 100 = 2 Мбит / с, когда я увеличиваю количество сообщения до 150, тогда стая проблема происходит.
Я просто хотел спросить, что ты думаешь о flockfile c api.
sudo lsof filename.txt
COMMAND PID USER FD TYPE DEVICE SIZE NODE NAME
push 25569 root 11u REG 253.4 1079 49266853 filename.txt
testEvent 25554 root 27uW REG 253.4 1079 49266853 filename.txt
testEvent 25554 root 28uW REG 253.4 1079 49266853 filename.txt
testEvent 25554 root 29uW REG 253.4 1079 49266853 filename.txt
testEvent 25554 root 30uW REG 253.4 1079 49266853 filename.txt
Многопоточная тестовая программа, которая будет вызывать функцию write_data_lib_func
lib.
void* sendMessage(void *arg) {
int* numOfMessagesPerSecond = (int*) arg;
std::cout <<" Executing p thread id " << pthread_self() << std::endl;
while(!terminateTest) {
Record *er1 = Record::create();
er1.setDate("some data");
for(int i = 0 ; i <=*numOfMessagesPerSecond ; i++){
ec = _write_data_lib_func(*er1);
if( ec != SUCCESS) {
std::cout << "write was not successful" << std::endl;
}
}
delete er1;
sleep(1);
}
return NULL;
Вышеуказанный метод будет вызываться в pthreads в основной функции теста.
for (i=0; i<_numThreads ; ++i) {
rc = pthread_create(&threads[i], NULL, sendMessage, (void *)&_num_msgs);
assert(0 == rc);
}
Вот источник писателя / читателя. Из-за проприетарных причин, которые я не хотел просто вырезать и вставлять, источник писателя получит доступ к нескольким потокам в процессе
int write_data_lib_func(Record * rec) {
if(fd == -1 ) {
fd = open(fn,O_RDWR| O_CREAT | O_APPEND, 0666);
}
if ( fd >= 0 ) {
/* some code */
if( flock(fd, LOCK_EX) < 0 ) {
print "some error message";
}
else {
if( maxfilesize) {
off_t len = lseek ( fd,0,SEEK_END);
...
...
ftruncate( fd,0);
...
lseek(fd,0,SEEK_SET);
} /* end of max spool size */
if( writev(fd,rec) < 0 ) {
print "some error message" ;
}
if(flock(fd,LOCK_UN) < 0 ) {
print some error message;
}
На стороне читателя есть процесс-демон без потоков.
int readData() {
while(true) {
if( fd == -1 ) {
fd= open (filename,O_RDWR);
}
if( flock (fd, LOCK_EX) < 0 ) {
print "some error message";
break;
}
if( n = read(fd,readBuf,readBufSize)) < 0 ) {
print "some error message" ;
break;
}
if( off < n ) {
if ( off <= 0 && n > 0 ) {
corrupt_file = true;
}
if ( lseek(fd, off-n, SEEK_CUR) < 0 ) {
print "some error message";
}
if( corrupt_spool ) {
if (ftruncate(fd,0) < 0 ) {
print "some error message";
break;
}
}
}
if( flock(fd, LOCK_UN) < 0 )
print some error message ;
}
}
}