работа с именованными каналами и семафорами в Linux - PullRequest
2 голосов
/ 21 декабря 2011

Я пытался заставить мою программу работать несколько часов, и я просто не могу понять, что не так с моим кодом. Речь идет о передаче переменной между процессами с использованием каналов. Каждый процесс увеличивает его M раз. Программа отлично работает, когда я использую общую память, но когда я переключаю ее на использование каналов, это катастрофа. Создание или использование именованных каналов, кажется, не работает вообще, или я думаю, что я делаю это неправильно. Вот исходный код:

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/mman.h>
#include <unistd.h>
#include <memory.h>
#include <fcntl.h>
#include <sys/stat.h>

#define PIPE_NAME   "MY_PIPE"
#define N 5
#define M 10

struct sembuf operations;   
int semid;          
key_t key;          
int marker;

void semWait(int semid, int sempos) {
    operations.sem_num = sempos;        
    operations.sem_op = -1;         
    operations.sem_flg = 0;         

    if (semop(semid, &operations, 1) < 0) {
        perror("ERROR: semop wait\n");
        exit(-1);
    }
}


void semPost(int semid, int sempos) {
    operations.sem_num = sempos;        
    operations.sem_op = 1;          
    operations.sem_flg = IPC_NOWAIT;    

    if (semop(semid, &operations, 1) < 0) {
        perror("ERROR: semop post\n");
        exit(-1);
    }
}


void worker(int id) {
    int j, nmarker;
    int fd = open(PIPE_NAME, O_RDWR);
    read(fd, &nmarker, sizeof(int));

    for (j = 0 ; j < M; j++) {
        semWait(semid, id);
        nmarker = nmarker + 1 ;
        printf("%d ", marker);
        semPost(semid, N);              
    }

    write(fd, &nmarker, sizeof(nmarker));
    close(fd);
}

main() {
    int i, tempPID;
    int sarray[N+1] = {0};
    key = 23;
    marker = 0;


    if ((semid = semget(key , N+1, 0666 | IPC_CREAT)) == -1) {
        perror("ERROR: semget\n");
        exit(-1);   
    }


    if ((semctl(semid, N+1, SETALL, sarray)) < 0) {     
        perror("ERROR: semctl - val\n");
        exit(-1);
    }

    if(mkfifo(PIPE_NAME, S_IFIFO | 0666) < 0) {     
        perror("ERROR:pipe\n");
        exit(-1);
    }


    int fd;
    if( fd = open(PIPE_NAME, O_WRONLY) < 0 ){       
        perror("ERROR:open\n");
        exit(-1);
    } 

    write(fd, &marker, sizeof(marker));
    close(fd);


    for(i = 0; i < N; i++) {
        tempPID = fork();
        if (tempPID < 0) {
            perror("ERROR: fork\n");
            exit(-1);
        }
        else if (tempPID == 0) {    // if child
            worker(i);
            exit(0);
        }
    }


    for (i = 0 ; i < (M*N); i++) {
        semPost(semid, i%N);        
        semWait(semid, N);          
    }

    printf("Marker = %d\n", marker);

    if (semctl( semid, 1, IPC_RMID ) == -1) {
        perror("ERROR: semctl free\n");
        exit(-1);
    }

    unlinc(PIPE_NAME);
}

Я создаю N рабочих процессов, и каждый из них должен увеличивать значение маркера M раз. Я должен создать пул «спящих» процессов и пробуждать их один за другим, используя семафоры, но это все размытие, так что текущий исходный код - все, что я придумал ...: \

Это версия той же программы, но с разделяемой памятью вместо каналов:

       #include <stdio.h>
       #include <stdlib.h>
       #include <sys/types.h>
           #include <sys/ipc.h>
         #include <sys/sem.h>
       #include <sys/mman.h>

       #define N 5
     #define M 10

        struct sembuf operations;   
     int semid;         
       key_t key;           
      int *sharedmem;


     void semWait(int semid, int sempos) {
operations.sem_num = sempos;        
operations.sem_op = -1;         
operations.sem_flg = 0;         

if (semop(semid, &operations, 1) < 0) {
    perror("ERROR: semop wait\n");
    exit(-1);
}
       }

      void semPost(int semid, int sempos) {
operations.sem_num = sempos;        
operations.sem_op = 1;          
operations.sem_flg = IPC_NOWAIT;    

if (semop(semid, &operations, 1) < 0) {
    perror("ERROR: semop post\n");
    exit(-1);
}
   }

       void worker(int id) {
int j;

for (j = 0 ; j < M; j++) {
    semWait(semid, id);
    (*sharedmem)++;
    semPost(semid, N);              
}
     }

     main() {
int i, tempPID;
int sarray[N+1] = {0};
int protect = PROT_READ | PROT_WRITE;
int flags = MAP_SHARED | MAP_ANONYMOUS;


if ((key = ftok("/dev/null", 4343)) == -1) {
    perror("ERROR: ftok\n");
    exit(-1);
}


if ((semid = semget(key , N+1, 0666 | IPC_CREAT)) == -1) {
    perror("ERROR: semget\n");
    exit(-1);   
}


if ((semctl(semid, N+1, SETALL, sarray)) < 0) {     
    perror("ERROR: semctl - val\n");
    exit(-1);
}


sharedmem = (int*)mmap(NULL, sizeof(int), protect, flags, 0, 0);
*(sharedmem) = 0;


    for(i = 0; i < N; i++) {
            tempPID = fork();
    if (tempPID < 0) {
        perror("ERROR: fork\n");
        exit(-1);
    }
            else if (tempPID == 0) {    // if child
                    worker(i);
                    exit(0);
            }
    }

for (i = 0 ; i < (M*N); i++) {
    semPost(semid, i%N);            
    semWait(semid, N);          
}

printf("Marker = %d\n", *sharedmem);

if (semctl( semid, 1, IPC_RMID ) == -1) {
    perror("ERROR: semctl free\n");
    exit(-1);
}


munmap(sharedmem, sizeof(int));
 }

1 Ответ

2 голосов
/ 21 декабря 2011

Некоторые из ваших проблем в рабочем коде - эти две строки:

int fd = open(PIPE_NAME, O_RDWR);
read(fd, &nmarker, sizeof(int));
  1. Если вы открываете канал для чтения и записи, вы напрашиваетесь на неприятности (IMNSHO). Откройте его только для чтения, прочитайте его, закройте. Затем откройте его только для записи, напишите, закройте. Теперь вы должны рассмотреть, где должна выполняться операция семафора. На самом деле вам нужно разбудить следующий процесс, прежде чем пытаться открыть канал для записи, потому что открытый для записи будет блокироваться, пока не будет доступен процесс для чтения из него. Аналогично, процесс, который открывается для чтения, будет блокироваться до тех пор, пока не будет доступен процесс для записи в него. Итак, ядро ​​будет координировать процессы.

  2. Вы не проверяете возвращаемое значение из open(), поэтому не знаете, получили ли вы действительный дескриптор файла. Всегда проверяйте статус возврата open().

  3. Вы не проверяете возвращаемое значение из read(), поэтому не знаете, получили ли вы что-либо действительное с конвейера. Всегда проверяйте статус возврата read().

(Вы можете принять решение игнорировать возвращаемый статус write(), если для неудачной записи невозможно существенное восстановление после ошибки, но проверить, что это сработало, неплохая идея. Вы можете проигнорировать возврат статус close() по тем же причинам, хотя о проблемах вы не узнаете, пока не выполните close().)

Продолжение в рабочем коде:

for (j = 0 ; j < M; j++) {
    semWait(semid, id);
    nmarker = nmarker + 1 ;
    printf("%d ", marker);
    semPost(semid, N);              
}

Удивительно, что вы печатаете marker вместо nmarker; и, конечно же, базовый метод диагностики выводит значение nmarker при чтении. Вы можете печатать или не печатать j и nmarker на каждой итерации. Обратите внимание, что поскольку в этом коде ничего не увеличивается marker, напечатанное значение не изменится.

Логическая последовательность здесь интересна ... она наиболее странным образом сочетается с циклом в main(). Родительский процесс записывает одно значение в FIFO. Только один ребенок может прочитать это значение - остальные сразу получают EOF или зависают на неопределенный срок (в зависимости от того, используете ли вы O_RDONLY или O_RDWR в дочерних элементах). Каждый ребенок получает сигнал для увеличения его значения, делает это, а затем снова ложится спать, пока не проснется снова. Нет ничего, что отправляет увеличенное значение следующему дочернему элементу. Таким образом, каждый ребенок независимо увеличивает значение, которое он выберет, что, вероятно, является мусором. В случае с разделяемой памятью, если у вас был указатель на разделяемое значение, то приращения были видны всеми процессами одновременно - вот почему это называется разделяемой памятью. Но здесь нет общей памяти, поэтому вам нужно явно общаться, чтобы она заработала. (Интересно, работала ли ваша реализация FIFO плюс с разделяемой памятью, потому что связь осуществлялась через разделяемую память - другими словами, случайно?)

Таким образом, если дочерний элемент должен увеличивать переменную, которую он читает каждый раз, он должен одновременно читать текущее значение и записывать новое значение каждый раз вокруг цикла. Конечно, это будет чтение с проверкой на ошибки. Вы могли бы быть в порядке с O_RDWR из-за семафоров, но я лично был бы счастлив с отдельными открытиями для чтения и записи - на каждой итерации, если это необходимо. Но я не реализовал это, чтобы проверить, что это действительно сталкивается с проблемами; просто использовать O_RDWR на FIFO просто.

После того, как ваш ребенок увеличил свое значение в N раз, он записывает результат в канал.

write(fd, &nmarker, sizeof(nmarker));
close(fd);

Затем основная программа выполняет:

printf("Marker = %d\n", marker);

if (semctl( semid, 1, IPC_RMID ) == -1) {
    perror("ERROR: semctl free\n");
    exit(-1);
}

unlinc(PIPE_NAME);

Так как он не изменил marker, напечатанное значение будет 0. Вы должны иметь основной процесс, читающий ответы от каждого из дочерних элементов.

Правильная функция для отмены связи FIFO: unlink() или remove().


Обсуждение

Как отмечается в комментарии, одной из проблем было то, что открытие FIFO было блокированием - никаких читателей. Однако это была далеко не единственная проблема.

Код ниже работает. Я не проверил, что число увеличивается как следует (но увеличивается). Я не проверял, что каждый процесс получает свою очередь. Я пересмотрел обработку ошибок (одна строка на вызов вместо 3 или 4) и добавил функцию печати, которая включает PID в вывод. Я проверил ошибки каждый системный вызов (но ни один из операторов печати). Я исправил проблему if (fd = open(...) < 0). Насколько я могу судить, закрытие FIFO в главном процессе отбрасывает записанный в него контент, поэтому родительский элемент больше не закрывает FIFO немедленно. Но в основном я переместил чтение и запись FIFO в рабочий цикл - оставляя открытым и закрытым снаружи. Код также пронизан диагностической печатью, поэтому я могу видеть, где он идет не так, когда он идет не так. Я не сделал минимизации заголовка или каких-либо других очисток, которые должны произойти. Однако все, кроме main(), является статичным, поэтому его не нужно предварительно объявлять. Он компилируется в соответствии с:

/usr/bin/gcc -O3 -g -std=c99 -Wall -Wextra fifocircle.c -o fifocircle 

Код

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/mman.h>
#include <unistd.h>
#include <memory.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <stdarg.h>
#include <errno.h>
#include <string.h>

static const char *arg0 = "undefined";

static void err_error(const char *fmt, ...)
{
    int errnum = errno;
    va_list args;
    fflush(0);
    fprintf(stderr, "%s: pid %d:", arg0, (int)getpid());
    va_start(args, fmt);
    vfprintf(stderr, fmt, args);
    va_end(args);
    if (errnum != 0)
        fprintf(stderr, "(%d: %s)", errnum, strerror(errnum));
    fputc('\n', stderr);
    exit(1);
}

static void print(const char *fmt, ...)
{
    va_list args;
    printf("pid %d: ", (int)getpid());
    va_start(args, fmt);
    vfprintf(stdout, fmt, args);
    va_end(args);
    fflush(0);
}

#define PIPE_NAME   "MY_PIPE"
#define N 5
#define M 10

static struct sembuf operations;
static int semid;
static key_t key;
static int marker;

static void semWait(int semid, int sempos)
{
    operations.sem_num = sempos;
    operations.sem_op = -1;
    operations.sem_flg = 0;

    if (semop(semid, &operations, 1) < 0)
        err_error("semop wait");
}

static void semPost(int semid, int sempos)
{
    operations.sem_num = sempos;
    operations.sem_op = 1;
    operations.sem_flg = IPC_NOWAIT;

    if (semop(semid, &operations, 1) < 0)
        err_error("semop post");
}

static void worker(int id)
{
    int j;
    int fd = open(PIPE_NAME, O_RDWR);
    if (fd < 0)
        err_error("failed to open FIFO %s for read & write", PIPE_NAME);
    print("Worker %d: fd %d\n", id, fd);

    for (j = 0 ; j < M; j++)
    {
        int nmarker;
        print("waiting for %d\n", id);
        semWait(semid, id);
        if (read(fd, &nmarker, sizeof(int)) != sizeof(int))
            err_error("short read from FIFO");
        print("Got %d from FIFO\n", nmarker);
        nmarker = nmarker + 1 ;
        if (write(fd, &nmarker, sizeof(nmarker)) != sizeof(nmarker))
            err_error("short write to FIFO");
        print("Wrote %d to FIFO\n", nmarker);
        print("posting %d\n", id);
        semPost(semid, N);
    }

    if (close(fd) != 0)
        err_error("failed to close FIFO");

    print("done\n");
}

int main(int argc, char **argv)
{
    int i;
    int sarray[N+1] = {0};
    key = 23;
    marker = 0;
    arg0 = argv[0];

    if (argc != 1)
        err_error("Usage: %s\n", arg0);

    if ((semid = semget(key , N+1, 0666 | IPC_CREAT)) == -1)
        err_error("semget");

    if ((semctl(semid, N+1, SETALL, sarray)) < 0)
    {
        perror("ERROR: semctl - val\n");
        exit(-1);
    }

    if (mkfifo(PIPE_NAME, S_IFIFO | 0666) < 0)
        err_error("failed to create FIFO %s\n", PIPE_NAME);
    print("FIFO created\n");

    int fd;
    if ((fd = open(PIPE_NAME, O_RDWR)) < 0 )
        err_error("failed to open FIFO %s\n", PIPE_NAME);
    print("FIFO opened\n");

    if (write(fd, &marker, sizeof(marker)) != sizeof(marker))
        err_error("short write to FIFO");
    print("FIFO loaded\n");

    print("Master: about to fork\n");
    for (i = 0; i < N; i++)
    {
        pid_t pid = fork();
        if (pid < 0)
            err_error("failed to fork");
        else if (pid == 0)
        {
            worker(i);
            exit(0);
        }
    }

    print("Master: about to loop\n");
    for (i = 0 ; i < (M*N); i++)
    {
        print("posting to %d\n", i%N);
        semPost(semid, i%N);
        print("waiting for %d\n", N);
        semWait(semid, N);
    }

    if (close(fd) != 0)
        err_error("failed to close FIFO");

    print("Marker = %d\n", marker);

    if (semctl( semid, 1, IPC_RMID ) == -1)
        err_error("semctl remove");

    if (unlink(PIPE_NAME) != 0)
        err_error("failed to remove FIFO %s", PIPE_NAME);

    return(0);
}
...