Как запускать конвейерные задачи с ограниченным числом вилок? - PullRequest
0 голосов
/ 04 ноября 2019

У меня есть простая программа, в которой я хочу смоделировать условие, что у меня недостаточно мощности вилки, поэтому я ограничиваю число вилок при выполнении задач конвейера.

разрешает работу конвейера, похожего на оболочкунаписано в C ++:

ls | cat | cat | cat | cat | cat | cat | cat | cat

У меня есть код, который запускает pipe() и fork():

#include <errno.h>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <sys/wait.h>
#include <unistd.h>

const int fork_limit = 3;
int fork_counter = 0;

static void sig_chld_handler(int signo) {
  int status;
  pid_t pid;
  while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
    printf("received SIGCHLD from child process %d\n", pid);
    fork_counter -= 1;
    fprintf(stdout, "counter --, %d\n", fork_counter);
  }
}

int main(int argc, char **argv) {

  signal(SIGCHLD, sig_chld_handler);

  char **cmds[9];

  char *p1_args[] = {"ls", NULL};
  char *p2_args[] = {"cat", NULL};

  cmds[0] = p1_args;
  cmds[1] = p2_args;
  cmds[2] = p2_args;
  cmds[3] = p2_args;
  cmds[4] = p2_args;
  cmds[5] = p2_args;
  cmds[6] = p2_args;
  cmds[7] = p2_args;
  cmds[8] = p2_args;


  int pipes[16];
  pipe(pipes);     // sets up 1st pipe
  pipe(pipes + 2); // sets up 2nd pipe
  pipe(pipes + 4);
  pipe(pipes + 6);
  pipe(pipes + 8);
  pipe(pipes + 10);
  pipe(pipes + 12);
  pipe(pipes + 14);


  pid_t pid;

  for (int i = 0; i < 9; i++) {

    // === comment this part to run correctly ===
    while (fork_limit < fork_counter) {
      usleep(10000);
    }
    // ===

    pid = fork();
    if (pid == 0) {
      fprintf(stdout, "fork p%d\n", i);

      // read
      if (i != 0) {
        if (dup2(pipes[(i - 1) * 2], 0) < 0) {
          fprintf(stderr, "dup2 error\n");
          exit(EXIT_FAILURE);
        }
      }

      // write
      if (i != 8) {
        if (dup2(pipes[i * 2 + 1], 1) < 0) {
          fprintf(stderr, "dup2 error\n");
          exit(EXIT_FAILURE);
        }
      }

      for (int j = 0; j < 16; j++) {
        close(pipes[j]);
      }

      execvp(*cmds[i], cmds[i]);
    } else {
      fork_counter += 1;
      fprintf(stdout, "counter ++, %d \n", fork_counter);
    }
  }

  for (int j = 0; j < 16; j++) {
    close(pipes[j]);
  }

  waitpid(pid, NULL, 0); // wait the last one.

  std::cout << "Parent done." << std::endl;
}

Строка while (fork_limit < fork_counter) - это то, что я ограничиваю дочерний номер. Если я удаляю блок while, код работает хорошо, но он зависает, если я добавлю это.

Я полагаю, что предыдущие дети умрут, так что fork_counter -= 1, и новый ребенок может быть разветвлен, но поведение не так, и я не могу понять, почему.


Результат без while.

counter ++, 1 
counter ++, 2 
fork p0
fork p1
counter ++, 3 
fork p2
counter ++, 4 
counter ++, 5 
fork p3
fork p4
counter ++, 6 
fork p5
counter ++, 7 
counter ++, 8 
fork p6
fork p7
counter ++, 9 
fork p8
received SIGCHLD from child process 13316
counter --, 8
Applications
Desktop
Documents
Downloads
Library
Movies
Music
Pictures
received SIGCHLD from child process 13319
counter --, 7
received SIGCHLD from child process 13318
counter --, 6
received SIGCHLD from child process 13317
counter --, 5
received SIGCHLD from child process 13320
counter --, 4
received SIGCHLD from child process 13322
counter --, 3
received SIGCHLD from child process 13321
counter --, 2
received SIGCHLD from child process 13323
counter --, 1
received SIGCHLD from child process 13324
counter --, 0
Parent done.

Результат с while,это означает, что я ограничиваю число вилки.

counter ++, 1 
counter ++, 2 
fork p0
fork p1
counter ++, 3 
counter ++, 4 
fork p2
fork p3
received SIGCHLD from child process 13291
counter --, 3
counter ++, 4 
fork p4

(hang)

Ответы [ 2 ]

2 голосов
/ 04 ноября 2019

Программа main выполняет (последовательно) следующее:

  1. Предварительно создайте все каналы
  2. Дочерние объекты-вилки, использующие каналы (каждый дочерний элемент закрывает все унаследованные каналы)
  3. Закрыть все трубы

Проблема в том, что время «закрыть все трубы». Поскольку main ожидает завершения первого дочернего элемента (while (fork_limit < fork_counter)), прежде чем он сможет завершить шаг № 2.

Однако дочерние элементы cat (например, 1-й cat) могутне завершать, пока их входной канал не будет закрыт ВСЕМИ процессами, включая он main, который ожидает их завершения. Фактически тупик.

Рассмотрим небольшую модификацию процесса main, которая закроет каналы для каждого дочернего элемента, как только дочерние элементы разветвляются:

if ( fork() ) {
   // Children
   ...

} else {
   // Main - close pipes ASAP.
      close(pipes[(i-1)*2]) ;
      close(pipes[(i-1)*2+1]);
      fork_counter += 1;
      fprintf(stdout, "counter ++, %d \n", fork_counter);
}

Вероятно, некоторая модификация длязакрытие трубы у детей также необходимо.

0 голосов
/ 05 ноября 2019

спасибо за ответ @ dash-o

Работает как:

#include <errno.h>
#include <fcntl.h>
#include <iostream>
#include <stdio.h>
#include <sys/wait.h>
#include <unistd.h>

const int fork_limit = 4;
int fork_counter = 0;

static void sig_chld_handler(int signo) {
  int status;
  pid_t pid;
  while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
    printf("received SIGCHLD from child process %d\n", pid);
    fork_counter -= 1;
    fprintf(stdout, "counter --, %d\n", fork_counter);
  }
}

int main(int argc, char **argv) {

  signal(SIGCHLD, sig_chld_handler);

  char **cmds[9];

  char *p1_args[] = {"ls", NULL};
  char *p2_args[] = {"cat", NULL};

  cmds[0] = p1_args;
  cmds[1] = p2_args;
  cmds[2] = p2_args;
  cmds[3] = p2_args;
  cmds[4] = p2_args;
  cmds[5] = p2_args;
  cmds[6] = p2_args;
  cmds[7] = p2_args;
  cmds[8] = p2_args;

  int pipes[16];
  pipe(pipes);     // sets up 1st pipe
  pipe(pipes + 2); // sets up 2nd pipe
  pipe(pipes + 4);
  pipe(pipes + 6);
  pipe(pipes + 8);
  pipe(pipes + 10);
  pipe(pipes + 12);
  pipe(pipes + 14);

  pid_t pid;

  for (int i = 0; i < 9; i++) {

    while (fork_limit < fork_counter) {
      usleep(10000);
    }

    pid = fork();
    if (pid == 0) {
      fprintf(stdout, "fork p%d\n", i);

      // read
      if (i != 0) {
        if (dup2(pipes[(i - 1) * 2], 0) < 0) {
          fprintf(stderr, "dup2 error\n");
          exit(EXIT_FAILURE);
        }
      }

      // write
      if (i != 8) {
        if (dup2(pipes[i * 2 + 1], 1) < 0) {
          fprintf(stderr, "dup2 error\n");
          exit(EXIT_FAILURE);
        }
      }

      for (int j = 0; j < 16; j++) {
        close(pipes[j]);
      }

      execvp(*cmds[i], cmds[i]);
    } else {

      if (i != 0) {
        close(pipes[(i - 1) * 2]);
        close(pipes[(i - 1) * 2 + 1]);
      }

      fork_counter += 1;
      fprintf(stdout, "counter ++, %d \n", fork_counter);
    }
  }

  for (int j = 0; j < 16; j++) {
    close(pipes[j]);
  }

  waitpid(pid, NULL, 0); // wait the last one.

  std::cout << "Parent done." << std::endl;
}
...