Истинная неблокирующая двусторонняя связь между родительским и внешним дочерним процессом - PullRequest
2 голосов
/ 04 июля 2011

Я прочитал около 50 постов и учебных пособий по этой теме, я скопировал, написал и протестировал около 20 альтернатив и провел все возможные исследования, о которых только мог придумать. Тем не менее, я не видел рабочего решения для следующей проблемы:

Родительский процесс A хочет передать данные во внешний процесс B, разрешить процессу B изменить данные и передать их обратно в родительский процесс A, а затем продолжить с родительским процессом A. Процесс B является частью внешнего набора программ, который у меня есть. не влияет на это, и это обычно выполняется так в командной строке UNIX:

< input_data program_B1 | program_B2 | program_B3 > output_data

... где

input_data, output_data: некоторые данные, которые обрабатываются в программах B1-B3

program_B1, B2, B3: Программы, которые читают данные из stdin (fread) и выводят в stdout (fwrite) и применяют некоторую обработку к данным.

Итак, по порядку:

(1) Родительский процесс A передает данные дочернему процессу B

(2) Дочерний процесс B считывает данные и изменяет их

(3) Дочерний процесс B передает данные обратно в родительский процесс A

(4) Родительский процесс A считывает данные и продолжает их (например, передает их далее процессу B2 ..).

(5) Родительский процесс A передает другой набор данных дочернему процессу B и т. Д.

Проблема заключается в том, что, что бы я ни делал, программа почти всегда заканчивается зависанием для чтения / обработки (или записи / записи?) В или из канала .

Следует отметить одну важную вещь: родительский процесс не может просто закрыть каналы после передачи данных дочернему процессу, поскольку он работает в цикле и хочет передать другой набор данных дочернему процессу после завершения обработки. первый сет.

Вот рабочий набор родительских / дочерних программ (скомпилируйте с помощью g ++ pipe_parent.cc -o pipe_parent, g ++ pipe_child.cc -o pipe_child), иллюстрирующий проблему с неназванными каналами. Я также пробовал именованные каналы, но не так широко. Каждое выполнение может иметь немного другой результат. Если в родительском элементе пропущен оператор sleep или в дочернем элементе пропущен оператор fflush () , каналы почти наверняка будут блокироваться. Если объем передаваемых данных увеличивается, он всегда блокируется независимо от режима сна или fflush.

Родительская программа A:

#include <cstring>
#include <cstdio>
#include <cstdlib>

extern "C" {
  #include <unistd.h>
  #include <fcntl.h>
 }

using namespace std;

/*
 * Parent-child inter-communication
 * Child is external process
 */

int main() {
  int fd[2];
  if( pipe(fd) == -1 ) {
    fprintf(stderr,"Unable to create pipe\n");
  }
  int fd_parentWrite = fd[1];
  int fd_childRead   = fd[0];
  if( pipe(fd) == -1 ) {
    fprintf(stderr,"Unable to create pipe\n");
    exit(-1);
  }
  int fd_childWrite = fd[1];
  int fd_parentRead = fd[0];

  pid_t pid = fork();
  if( pid == -1 ) {
    fprintf(stderr,"Unable to fork new process\n");
    exit(-1);
  }

  if( pid == 0 ) { // Child process
    dup2( fd_childRead,  fileno(stdin)  );  // Redirect standard input(0) to child 'read pipe'
        dup2( fd_childWrite, fileno(stdout) );  // Redirect standard output(1) to child 'write pipe'

    close(fd_parentRead);
    close(fd_parentWrite);
    close(fd_childRead);
    close(fd_childWrite);
    // execl replaces child process with an external one
    int ret = execl("/disk/sources/pipe_test/pipe_child","pipe_child",NULL);
    fprintf(stderr,"External process failed, return code: %d...\n", ret);
    exit(-1);
    // Child process is done. Will not continue from here on
  }
  else { // Parent process
    // Nothing to set up
  }

  // ...more code...

  if( pid > 0 ) { // Parent process (redundant if statement)
    int numElements = 10000;
    int totalSize = numElements * sizeof(float);
    float* buffer = new float[numElements];
    for( int i = 0; i < numElements; i++ ) {
      buffer[i] = (float)i;
    }

    for( int iter = 0; iter < 5; iter++ ) {
      fprintf(stderr,"--------- Iteration #%d -----------\n", iter);
      int sizeWrite = (int)write( fd_parentWrite, buffer, totalSize );
      if( sizeWrite == -1 ) {
        fprintf(stderr,"Parent process write error\n");
        exit(-1);
      }
      fprintf(stderr,"Parent #%d: Wrote %d elements. Total size: %d\n", iter, sizeWrite, totalSize);
      sleep(1);   // <--- CHANGE!
      int sizeRead = (int)read( fd_parentRead, buffer, totalSize );
      if( sizeRead <= 0 ) {
        fprintf(stderr,"Parent process read error\n");
      }
      while( sizeRead < totalSize ) {
        fprintf(stderr,"Parent #%d: Read %d elements, continue reading...\n", iter, sizeRead);
        int sizeNew = (int)read( fd_parentRead, &buffer[sizeRead], totalSize-sizeRead );
        fprintf(stderr," ...newly read %d elements\n", sizeNew);
        if( sizeNew < 0 ) {
          exit(-1);
        }
        sizeRead += sizeNew;
      }
      fprintf(stderr,"Parent #%d: Read %d elements. Total size: %d\n", iter, sizeRead, totalSize);
      fprintf(stderr,"Examples :  %f  %f  %f\n", buffer[0], buffer[10], buffer[100]);
    }

    delete [] buffer;
  }

  close(fd_parentRead);
  close(fd_parentWrite);
  close(fd_childRead);
  close(fd_childWrite);

  return 0;
}

Детская программа B:

#include <cstdio>

using namespace std;

int main() {

  int numElements = 10000;
  int totalSize = numElements * sizeof(float);
  float* buffer = new float[numElements];

  int counter = 0;
  int sizeRead = 0;
  do {
    sizeRead = fread( buffer, 1, totalSize, stdin);
    fprintf(stderr,"Child  #%d: Read %d elements, buffer100: %f\n", counter, sizeRead, buffer[100]);
    if( sizeRead > 0 ) {
      for( int i = 0; i < numElements; i++ ) {
        buffer[i] += numElements;
      }
      int sizeWrite = fwrite( buffer, 1, totalSize, stdout);
      fflush(stdout);  // <--- CHANGE!

      fprintf(stderr,"Child  #%d: Wrote %d elements\n", counter, sizeWrite);
      counter += 1;
    }
  } while( sizeRead > 0 );

  return 0;
}

Есть ли способ проверить, достаточно ли данных для чтения в канале? Или есть альтернативный способ решения вышеуказанной проблемы, с трубами или без них?

Пожалуйста, помогите!

Ответы [ 2 ]

0 голосов
/ 07 июля 2011

Первый ответ (с использованием select, чтобы выяснить, готов ли канал для чтения), был хорошим, но на самом деле не решил мою проблему, см. Также мои предыдущие комментарии.Рано или поздно у меня всегда возникало «состояние гонки», когда программа продолжала зависать на read или write.

. Решение (может быть, не единственное?) Состоит в том, чтобы запуститьпередача данных от ребенка к родителю в другом потоке.Я также вернулся и реализовал каналы как именованные каналы .Это, вероятно, также будет работать с неназванными каналами, но я не проверял это.

Окончательный код приведен ниже.Обратите внимание, что явной очистки не требуется;передача данных между родителями и потомками теперь отделена.Любые комментарии о том, как это можно улучшить, приветствуются!Одна остаточная проблема, которую я вижу, состоит в том, что каналы могут заполняться в зависимости от того, сколько времени ребенок должен обрабатывать данные.Я не уверен, насколько вероятно, что это произойдет.И, кстати, это работало нормально с моими внешними программами, а не только с предоставленной дочерней программой.

Родительская программа A:

#include <cstring>
#include <cstdio>
#include <cstdlib>
#include <string>
#include <iostream>

extern "C" {
  #include <unistd.h>
  #include <fcntl.h>
  #include <sys/stat.h>
  #include <sys/types.h>
  #include <errno.h>
  #include <signal.h>
  #include <sys/wait.h>
  #include <pthread.h>
}

using namespace std;

static int const READING  = -1;
static int const BUFFER_READY = 1;
static int const FINISHED = 0;

/*
 * Parent-child inter-communication
 * Child is external process
 */

struct threadStruct {
  FILE*  file_c2p;
  int    sizeBuffer;
  float* buffer;
  int    io_flag;
};

// Custom sleep function
void mini_sleep( int millisec ) {
  struct timespec req={0},rem={0};
  time_t sec = (int)(millisec/1000);
  millisec    = (int)(millisec-(sec*1000));
  req.tv_sec  = sec;
  req.tv_nsec = millisec*1000000L;
  nanosleep(&req,&rem);
}

// Function to be executed within separate thread: Reads in data from file pointer
// Hand-shaking with main thread is done via the flag 'io_flag'
void *threadFunction( void *arg ) {
  threadStruct* ptr = (threadStruct*)arg;

  ptr->io_flag = READING;
  while( ptr->io_flag != FINISHED ) {
    if( ptr->io_flag == READING ) {
      int sizeRead = fread( ptr->buffer, 1, ptr->sizeBuffer, ptr->file_c2p );
      if( sizeRead <= 0 ) {
        ptr->io_flag = FINISHED;
        return NULL;
      }
      ptr->io_flag = BUFFER_READY;
    }
    else {
      mini_sleep(10);
    }
  }
  return NULL;
}

//--------------------------------------------------
int main() {
  std::string filename_p2c("/tmp/fifo11_p2c");
  std::string filename_c2p("/tmp/fifo11_c2p");

  fprintf(stderr,"..started\n");

  int status = mknod(filename_p2c.c_str(), S_IRUSR | S_IWUSR | S_IFIFO, 0);
  if( (status == -1) && (errno != EEXIST) ) {
    fprintf(stderr,"Error creating named pipe: %s\n", strerror(errno));
    exit(-1);
  }
  status = mknod(filename_c2p.c_str(), S_IRUSR | S_IWUSR | S_IFIFO, 0);
  if( (status == -1) && (errno != EEXIST) ) {
    fprintf(stderr,"Error creating named pipe: %s\n", strerror(errno));
    exit(-1);
  }

  FILE* file_dump = fopen("parent_dump","w");

  int fd_p2c;
  int fd_c2p;
  FILE* file_c2p = NULL;

  //--------------------------------------------------
  // Set up parent/child processes
  //
  pid_t pid = fork();
  if( pid == -1 ) {
    fprintf(stderr,"Unable to fork new process\n");
  }

  if( pid == 0 ) { // Child process
    fd_p2c = open( filename_p2c.c_str(), O_RDONLY );
    if( fd_p2c < 0 ) {
      fprintf(stderr,"Child: Error opening the named pipe: %d %d '%s'\n", fd_p2c, errno, strerror(errno));
      exit(-1);
    }
    fd_c2p = open( filename_c2p.c_str(), O_WRONLY );
    if( fd_c2p < 0 ) {
      fprintf(stderr,"Child: Error opening the named pipe: %d %d '%s'\n", fd_c2p, errno, strerror(errno));
      exit(-1);
    }

    dup2(fd_p2c,fileno(stdin));    // Redirect standard input(0) to child 'read pipe'
    dup2(fd_c2p,fileno(stdout));  // Redirect standard output(1) to child 'write pipe'
    close(fd_p2c);
    close(fd_c2p);

    int ret = execl("/disk/sources/pipe_test/pipe_child","pipe_child",NULL);
    fprintf(stderr,"External process failed, return code: %d...\n", ret);
    kill( getppid(), 9 );  // Kill parent process
    exit(-1);
  }
  else { // Parent process
    fd_p2c = open( filename_p2c.c_str(), O_WRONLY );
    if( fd_p2c < 0 ) {
      fprintf(stderr,"Parent: Error opening the named pipe: %d %d '%s'\n", fd_p2c, errno, strerror(errno));
      exit(-1);
    }
    file_c2p = fopen( filename_c2p.c_str(), "r");
    fd_c2p = fileno( file_c2p );
    if( fd_c2p < 0 ) {
      fprintf(stderr,"Parent: Error opening the named pipe: %d %d '%s'\n", fd_c2p, errno, strerror(errno));
      exit(-1);
    }
  }

  int numElements = 10000;
  int sizeBuffer = numElements * sizeof(float);
  float* bufferIn  = new float[numElements];
  float* bufferOut = new float[numElements];
  for( int i = 0; i < numElements; i++ ) {
    bufferIn[i]  = 0.0;
  }
  int numIterations = 5;
  int numBytesAll = numElements * sizeof(float) * numIterations;

  pthread_t thread;
  threadStruct* threadParam = new threadStruct();
  threadParam->file_c2p   = file_c2p;
  threadParam->sizeBuffer = sizeBuffer;
  threadParam->buffer     = bufferIn;
  threadParam->io_flag    = READING;

  int thread_stat = pthread_create( &thread, NULL, threadFunction, threadParam );
  if( thread_stat < 0 ) {
    fprintf(stderr,"Error when creating thread\n");
    exit(-1);
  }

  int readCounter  = 0;
  int numBytesWrite = 0;
  int numBytesRead  = 0;
  for( int iter = 0; iter < numIterations; iter++ ) {
    for( int i = 0; i < numElements; i++ ) {
      bufferOut[i] = (float)i + iter*numElements*10;
    }

    int sizeWrite = (int)write( fd_p2c, bufferOut, sizeBuffer );
    if( sizeWrite == -1 ) {
      fprintf(stderr,"Parent process write error\n");
      exit(-1);
    }
    numBytesWrite += sizeWrite;
    fprintf(file_dump,"Parent #%d: Wrote %d/%d bytes.\n", iter, numBytesWrite, numBytesAll);

    if( iter == numIterations-1 ) close(fd_p2c);  // Closing output pipe makes sure child receives EOF

    if( threadParam->io_flag != READING ) {
          numBytesRead += sizeBuffer;
      fprintf(file_dump,"Parent #%d: Read  %d/%d bytes. Examples: %f %f\n",
              readCounter, numBytesRead, numBytesAll, bufferIn[1], bufferIn[numElements-1] );
      readCounter += 1;
      if( threadParam->io_flag != FINISHED ) threadParam->io_flag = READING;
    }
  }
  //********************************************************************************
  //

  fprintf(file_dump,"------------------------------\n");

  while( threadParam->io_flag != FINISHED ) {
    if( threadParam->io_flag == BUFFER_READY ) {
      numBytesRead += sizeBuffer;
      fprintf(file_dump,"Parent #%d: Read  %d/%d bytes. Examples: %f %f\n",
              readCounter, numBytesRead, numBytesAll, bufferIn[1], bufferIn[numElements-1] );
      readCounter += 1;
      if( threadParam->io_flag != FINISHED ) threadParam->io_flag = READING;
    }
    else {
      mini_sleep(10);
    }
  }

  // wait for thread to finish before continuing
  pthread_join( thread, NULL );


  fclose(file_dump);
  fclose(file_c2p);
  waitpid(pid, &status, 0); // clean up any children
  fprintf(stderr,"..finished\n");

  delete [] bufferIn;
  delete [] bufferOut;

  return 0;
}

Дочерняя программаB:

#include <cstdio>

using namespace std;

int main() {

  int numElements = 10000;
  int totalSize = numElements * sizeof(float);
  float* buffer = new float[numElements];

  FILE* file_dump = fopen("child_dump","w");

  int counter = 0;
  int sizeRead = 0;
  do {
    sizeRead = fread( buffer, 1, totalSize, stdin);
    if( sizeRead > 0 ) {
      fprintf(file_dump,"Child  #%d: Read  %d bytes, examples:  %f  %f\n", counter, sizeRead, buffer[1], buffer[numElements-1]);
      for( int i = 0; i < numElements; i++ ) {
        buffer[i] += numElements;
      }
      int sizeWrite = fwrite( buffer, 1, totalSize, stdout);
      fprintf(file_dump,"Child  #%d: Wrote %d bytes, examples:  %f  %f\n", counter, sizeRead, buffer[1], buffer[numElements-1]);
      counter += 1;
    }
  } while( sizeRead > 0 );
  fprintf(file_dump,"Child is finished\n");
  fclose(file_dump);
  fclose(stdout);

  return 0;
}
0 голосов
/ 04 июля 2011

Возможно, лучшее решение при чтении - проверить с помощью select, можете ли вы читать с канала. Вы можете даже передать тайм-аут. Альтернативой может быть установка флага O_NONBLOCK в дескрипторе файла 0 (stdin) с помощью fcntl, хотя я думаю, что способ select лучше.

Как и при обеспечении неблокирующей записи: это немного сложнее, так как вы не знаете сколько вы можете написать перед конвейерными блоками. Один из способов (который я считаю очень уродливым) состоит в том, чтобы писать только 1-байтовые куски и снова проверять с помощью select, можете ли вы писать. Но это может привести к снижению производительности, поэтому используйте его, только если производительность при общении не является проблемой.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...