C ++ Управление процессами - PullRequest
1 голос
/ 28 февраля 2012

У меня есть один процесс, который создает 10 подпроцессов, и все они разделяют сегмент памяти.10 подпроцессов могут выполняться одновременно, но я хочу, чтобы отец спал в это время.Когда дети заканчивают, они идут спать.Когда все дети сделаны, отец делает некоторые вычисления и разбудит детей для следующего раунда (и он снова ложится спать, пока все дети не закончат свои вычисления).Я думал, что я могу сделать pause () внутри цикла while, что обусловлено check_children_finished ().Дети знают, что нужно подать сигнал родителю с помощью kill (), и у них есть свой pid (отправленный им в качестве аргумента), а на самом деле у отца также есть pid детей (сохраненный в общей памяти). Вот некоторый код -


Родительский код:


#include <csignal>
#include <cstdio>
#include <cstdlib>
#include <sys/types.h>

#include <sys/ipc.h> 
#include <sys/shm.h> 
#include <sys/stat.h>
#include <unistd.h>
#include <cstring>
#include <string>
#include <sstream>
#include <iostream>

using namespace std;

static void sig_usr(int);
static volatile sig_atomic_t *sharedArray;
static int segmentId; 
int number_of_children = 10;

bool check_children_finished(){
  for (int i=1; i<number_of_children; i++)
    if (sharedArray[i*2] == -1)
      return false;
  return true;
}

void sig_usr (int signo)
{
  /*
  // This is signals handler (when a signal fires this is what runs)
  // we expect SIGUSR1 from the child process
  if(signo == SIGUSR1)
    cout << "(parent: " << (int)getpid() << ") --- caught SIGUSR1" << endl;
  if (signo == SIGUSR2){
    cout << "(parent: " << (int)getpid() << ") --- caught SIGUSR2" << endl;
  }else
    perror("unexpected signal fired");
  //  return;
  */
}


int main()
{

  // size of the shared memory array
  int arrSize = 100;
  const int shareSize = sizeof(sig_atomic_t) * (arrSize);
  /* Allocate shared memory segment */  
  segmentId = shmget(IPC_PRIVATE, shareSize, S_IRUSR | S_IWUSR); 
  sharedArray = (sig_atomic_t *) shmat(segmentId, NULL, 0);

  // feel shared memory with -1
  for (int i=0; i<arrSize; i++)
    sharedArray[i] = -1;

  // binding SIGUSR1 & SIGUSR2 to sig_usr method
  // we need to override SIGUSR2 because of group signaling
  signal(SIGUSR1, sig_usr);
  //  signal(SIGUSR2,sig_usr);

  fprintf(stderr, "\n (parnet) myPid=%d ; segId=%d\n",(int)getpid(), segmentId);
  sharedArray[0] = 123;
  int kids = 0; // this is the number of child processes
  // we send to the child (as shell parameters) the parent pid, shared segment address , and index to the shared memory(this is where he will write his pid and in index+1 the heuristic value)
  char* kidsParams[5];

  // takes care of param[0]=command to run
  string exec_line = "./child";
  kidsParams[0] = new char[exec_line.size()+1];
  memcpy(kidsParams[0], exec_line.c_str(), exec_line.size());

  // takes care of param[1]=parent pid
  kidsParams[1] = new char[100]; // = malloc(100*sizeof(char));
  sprintf( kidsParams[1],"%d",(int)getpid());

  // takes care of param[2]=shared mem segment address
  kidsParams[2] = new char[100];
  sprintf( kidsParams[2],"%d",segmentId);

  // takes care of param[3]=the child private index in shared mem
  kidsParams[3] = new char[100];

  kidsParams[4] = NULL;  // needed as end of array
  int index = 0;
  for(; kids<number_of_children; kids++) {
    sprintf( kidsParams[3],"%d",index);
    index+=2;
    pid_t childpid = fork();
    if(childpid==0){
      execv(kidsParams[0],kidsParams);
    }
   }
  cout << "(parent) --- just finished creating " << number_of_children << " kids" << endl;
  cout << "(parent) entering to while {...} pause" << endl;
  for (int i=0; i<number_of_children; i++)
    cout << "[" << i << "] = " << sharedArray[i];
  cout << endl;
  // going to sleep --- here I want while loop with conditioning that all children finished
  while ( ! check_children_finished() ) {
    cout << "(parent) now will signal the group" << endl;
    // killpg sends signal to the group (all the children). note that the group has the same pid as the father
    killpg(getpid(),SIGUSR2);
    cout << "(parent) just finished signaling the group" << endl;
    pause();
    for (int i=0; i<number_of_children; i++)
       cout << "[" << i << "] = " << sharedArray[i];
    cout << endl;
  }
  cout << "(parent) exited the while{...} paused" << endl;


  // removes shared memory
  //  shmdt (sharedArray);  
  //  shmctl (segmentId, IPC_RMID, NULL);  
  // note that all children must also shmctl (...IPC_RMID...);

}

Вот дочерний код:


(same includes...)
using namespace std;

// declare the function proptotype (needed in signal function)
static void sig_usr(int);

// handles the signal (what happens when the signal fires) --- here I want to solve the search problem
void sig_usr (int signo)
{
  /*
  if(signo == SIGUSR1){
    cout << "(child: " << (int)getpid() << ") --- caught SIGUSR1" << endl;
  }else if(signo == SIGUSR2){
    cout << "(child: " << (int)getpid() << ") --- caught SIGUSR2" << endl;
  }else
    perror("eerrrr");
  //  return;
  */

}

int main(int argc, char** argv){
  // binding the signal to the handler
  signal(SIGUSR2,sig_usr);
  int segmentId;  
  volatile sig_atomic_t *sharedArray ;
  int myIndex;
  int myData =  5; 
  int parentPid;
  // read parameters
  parentPid = atoi(argv[1]);
  segmentId = atoi(argv[2]);
  myIndex = atoi(argv[3]);

  // declare a pointer to the shared memory
  sharedArray = (sig_atomic_t *) shmat(segmentId, NULL, 0);
  sharedArray[myIndex] =(int)getpid();
  sharedArray[myIndex+1] = myData;
  //  fprintf(stderr, "My Group Pid(child): %d\n",(int)getpgrp());
  cout << "(child: " << (int)getpid() << ") --- going to sleep" << endl;
  pause();
  cout << "(child: " << (int)getpid() << ") --- I woke up" << endl;
  //calc data

  //fprintf(stderr, "My Pid(child): %d\n",(int)getpid());
  //fprintf(stderr, "I got %d (child)\n",sharedArray[0] );


  // this signals the father
  kill(parentPid,SIGUSR1);
  cout << "fired SIGUSR1"<< endl;
}

Вот типичныйвывод:


 (parnet) myPid=3104 ; segId=22872080
(parent) --- just finished creating 10 kids
(parent) entering to while {...} pause
[0] = 123[1] = -1[2] = -1[3] = -1[4] = -1[5] = -1[6] = -1[7] = -1[8] = -1[9] = -1
(parent) now will signal the group
User defined signal 2

Иногда я получаю что-то вроде:


 (parnet) myPid=3126 ; segId=22937618
(child: 3129) --- going to sleep
(child: 3127) --- going to sleep
(parent) --- just finished creating 10 kids
(parent) entering to while {...} pause
[0] = 3127[1] = 5[2] = 3128[3] = 5[4] = 3129[5] = 5[6] = -1[7] = -1[8] = -1[9] = -1
(parent) now will signal the group
User defined signal 2
(child: 3127) --- I woke up
fired SIGUSR1
(child: 3128) --- going to sleep
(child: 3132) --- going to sleep
(child: 3129) --- I woke up
fired SIGUSR1

Может кто-нибудь предложить решение?Спасибо!- Лирон

Ответы [ 3 ]

4 голосов
/ 28 февраля 2012

Я бы предложил использовать каналы между процессом монитора и другими рабочими процессами с простым текстовым протоколом для управления связью.

(поскольку текстовый протокол на каналах более надежен, чем сигналы - которые могут быть "потеряны" или "объединены")

Итак, перед тем, как разбудить рабочих, я бы позвонил (например, 10 раз или, возможно, 2 * 10 раз, если вы хотите их обоих) pipe (2) , чтобы создать управляющую связь. Тогда вы можете использовать ppoll (2) (или просто poll) для мультиплексирования каналов.

Но вы рассматривали возможность использования существующих фреймворков , например, например. Open-MPI (реализация MPI = "Интерфейс передачи сообщений"). Это не значит отказаться от использования сегмента общей памяти (просто используйте MPI для контроля и синхронизации). Или, возможно, используя OpenMP или просто p-threads ?

MPI очень используется в высокопроизводительных численных вычислениях, и многие суперкомпьютеры имеют его (используют специализированное оборудование для очень быстрой передачи сообщений).

(Конечно, суперкомпьютеры - это кластеры, поэтому у вас нет общей памяти между тысячами ядер; вы воспользуетесь ими, используя в своем коде только MPI ...)

(Вы также можете исследовать с помощью GPGPU через OpenCL )

1 голос
/ 28 февраля 2012

Обычный способ синхронизации процессов - это использование семафоров.

Более сложные операции семафора предоставляются SysV с использованием semctl (), semop () и semget (). Семафоры POSIX (которые проще) используют sem_open (), sem_close (), sem_post () и sem_wait ().

В зависимости от того, что вы используете, создайте семафор в главном процессе перед запуском дочерних элементов и удалите семафор после того, как все дочерние элементы были получены при выходе из основного процесса.

Каждый из потомков должен выполнить sem_open () (без O_CREAT) на семафоре, а затем sem_wait (), где они будут блокироваться при условии, что вы правильно инициализировали семафор. Когда функция sem_wait () возвращает код, который касается общей памяти, а затем вызывает функцию sem_post (). Это должно дать эксклюзивный общий доступ к вашему сегменту общей памяти.

Использование сигналов кажется плохой идеей. Я, конечно, никогда не слышал о том, чтобы делать это таким образом.

По моему опыту, у меня был отдельный процесс, который создавал все семафоры SysV и копировал идентификаторы семафоров в подходящее место в сегменте разделяемой памяти. Это все, что сделал процесс, и затем он вышел. Семафоры являются постоянными - тогда я запускаю другие процессы из командной строки, которые открывают семафор (без O_CREAT) и выполняют sem_wait (). Поэтому я не использовал fork / exec для запуска дочерних процессов. Отдельные процессы из командной строки казались проще.

Я не уверен, наследуют ли потомки процессов идентификаторы семафоров, как наследуют файловые дескрипторы

0 голосов
/ 29 февраля 2012

Действительно, использование семафоров делает вещи немного проще и, наконец, работает.Обратите внимание, что вы должны скомпилировать с флагом -pthread (например, g ++ -Wall -pthread chikd.c -o parent).У меня есть небольшая проблема, кто-нибудь знает, могу ли я сделать что-то лучше, чем спать (1)?(родительский код, строка 109).

Родительский код:

#include <csignal>
#include <cstdio>
#include <cstdlib>
#include <sys/types.h>
#include <sys/ipc.h> 
#include <sys/shm.h> 
#include <sys/stat.h>
#include <unistd.h>
#include <cstring>
#include <string>
#include <sstream>
#include <iostream>
#include <semaphore.h>
#include <fcntl.h>

using namespace std;

char SEM_NAME[] = "lir";

static void sig_usr(int);
static sig_atomic_t *sharedArray; // static volatile sig_atomic_t *sharedArray;
static int segmentId; 

int number_of_children = 10;
int child_init_ctrl = 0;
int child_finish_ctrl = 1;
int number_of_children_ctrl = 2;
int indexes_start_ctrl = 3;
// each child has its own : PID,DATA,ADDRESS_TO_HIS_INIT_STATE
int data_start = indexes_start_ctrl + number_of_children*3;

int getMyStartIndex(){
  return -1;
}

void sig_usr (int signo) {}

int main() {

  // create&init *new* semaphore
  sem_t *mutex;
  mutex = sem_open (SEM_NAME,O_CREAT,0644,1);
  if(mutex == SEM_FAILED) {
    perror("unable to create semaphore");
    sem_unlink(SEM_NAME);
    exit(-1);
  }

  // size of the shared memory array
  int arrSize = 100;
  const int shareSize = sizeof(sig_atomic_t) * (arrSize);
  /* Allocate shared memory segment */  
  segmentId = shmget(IPC_PRIVATE, shareSize, S_IRUSR | S_IWUSR); 
  sharedArray = (sig_atomic_t *) shmat(segmentId, NULL, 0);

  // fill shared memory with -1
  for (int i=0; i<arrSize; i++)
    sharedArray[i] = -1;

  // binding SIGUSR1 & SIGUSR2 to sig_usr method
  // we need to override SIGUSR2 because of group signaling
  signal(SIGUSR1, sig_usr);
  signal(SIGUSR2,sig_usr);

  sem_wait(mutex);
  sharedArray[child_init_ctrl] = 0;
  sharedArray[child_finish_ctrl] = 0;
  sharedArray[number_of_children_ctrl] = number_of_children;
  sem_post(mutex);

  // we send to the child (as shell parameters) the parent pid, shared segment address , and index to the shared memory(this is where he will write his pid and in index+1 the     heuristic value)
  char* kidsParams[6];

  // takes care of param[0]=command to run
  string exec_line = "./child";
  kidsParams[0] = new char[exec_line.size()+1];
  memcpy(kidsParams[0], exec_line.c_str(), exec_line.size());

  // takes care of param[1]=parent pid
  kidsParams[1] = new char[100]; // = malloc(100*sizeof(char));
  sprintf( kidsParams[1],"%d",(int)getpid());

  // takes care of param[2]=shared mem segment address
  kidsParams[2] = new char[100];
  sprintf( kidsParams[2],"%d",segmentId);

  // takes care of param[3]=the child private index in shared mem
  kidsParams[3] = new char[100];

  // takes care of param[4]=the child's first index of start state
  kidsParams[4] = new char[100]; // = malloc(100*sizeof(char));
  sprintf( kidsParams[4],"%d",getMyStartIndex());

  kidsParams[5] = NULL;  // needed as end of array

  // creates the child processes (and update child private index)
  int index = 3;
  for(int kids=0; kids<number_of_children; kids++) {
    sprintf( kidsParams[3],"%d",index);
    index+=3;
    pid_t childpid = fork();
    if(childpid==0){
      execv(kidsParams[0],kidsParams);
    }
  }

  while ( sharedArray[child_init_ctrl] != number_of_children ) {
    // waiting on the CPU !!! or going to sleep. is there a better solution?
    sleep(1);
  }

  // killpg sends signal to the group (all the children). note that the group has the same     pid as the father
  killpg (getpid(),SIGUSR2);

  // going to sleep --- here I want while loop with conditioning that all children finished
  bool all_child_finished_flag = false;
  while ( all_child_finished_flag == false ) {
    sem_wait(mutex);
    if (sharedArray[child_finish_ctrl] == number_of_children)
      all_child_finished_flag = true;
    sem_post(mutex);
    if (all_child_finished_flag == false)
      pause();
  }   


  // removes shared memory and delete semaphore
  shmdt (sharedArray);  
  shmctl (segmentId, IPC_RMID, NULL);  
  // note that all children must also shmctl (...IPC_RMID...);
  // removes the mutex
  sem_close(mutex);
  sem_unlink(SEM_NAME);

}

Дочерний код:

// declare the function proptotype (needed in signal function)
static void sig_usr(int);
int child_init_ctrl = 0;
int child_finish_ctrl = 1;
int number_of_children_ctrl = 2;
int number_of_children=0;

// handles the signal (what happens when the signal fires)
void sig_usr (int signo) {}

char SEM_NAME[] = "lir";

int main(int argc, char** argv) {
    (same includes...)

      // create&init *existing* semaphore
      sem_t *mutex;
      mutex = sem_open (SEM_NAME, 0, 0644, 0);
      if(mutex == SEM_FAILED) {
        perror("reader:unable to execute semaphore");
        sem_close(mutex);
        exit(-1);
      }

      // binding the signal to the handler
      signal(SIGUSR2,sig_usr);

      volatile sig_atomic_t *sharedArray ;
      int myData=0;
      // read parameters
      int parentPid = atoi(argv[1]);
      int segmentId = atoi(argv[2]);
      int myIndex = atoi(argv[3]);
      int myStartState = atoi(argv[4]);

      // declare a pointer to the shared memory
      sharedArray = (sig_atomic_t *) shmat(segmentId, NULL, 0);
      sharedArray[myIndex] =(int)getpid();
      sharedArray[myIndex+1] = myData;
      number_of_children = sharedArray[number_of_children_ctrl];

      sem_wait(mutex);
      sharedArray[child_init_ctrl]++;
      sem_post(mutex);

      pause();

      // do some "calculations"
      sharedArray[myIndex+1] = myIndex;
      for (int i=0; i<10; i++)
        sharedArray[myIndex+1] += i;

      // this signals the father
      bool should_i_fire = false;
      sem_wait(mutex);
      sharedArray[child_finish_ctrl]++;
      if (sharedArray[child_finish_ctrl] == number_of_children)
        should_i_fire = true;
      sem_post(mutex);
      if (should_i_fire == true)
        kill(parentPid,SIGUSR1);
    }

Best, - Liron

...