Неблокирующее чтение / запись в stdin / stdout в C на Linux или Mac - PullRequest
0 голосов
/ 04 января 2019

У меня есть две программы, взаимодействующие через именованные каналы (на Mac), но размер буфера именованных каналов слишком мал.Программа 1 записывает 50 Кбайт в канал 1 перед чтением канала 2. Именованные каналы имеют размер 8 КБ (в моей системе), поэтому программа 1 блокируется, пока данные не будут использованы.Программа 2 считывает 20 Кбайт из канала 1, а затем записывает 20 Кбайт в канал 2.Pipe2 не может держать 20K, поэтому программа 2 теперь блокирует.Он будет выпущен только тогда, когда программа 1 выполнит чтение.Но программа 1 заблокирована в ожидании программы 2. deadlock

Я думал, что смогу решить эту проблему, создав программу-прокладку, которая читает stdin неблокирующую и записывает stdout неблокирующую, временно сохраняя данные в большом буфере,Я протестировал программу, используя данные кошки |./Gasket 0 |./gasket 1> out, ожидая, что будет копия данных.Однако, хотя первый вызов прокладки работает, как и ожидалось, чтение во второй программе возвращает 0 до того, как все данные будут использованы, и никогда не возвращает ничего, кроме 0 в последующих вызовах.

Я пробовал код ниже, обана MAC и Linux.Оба ведут себя одинаково.Я добавил ведение журнала, чтобы видеть, что при перезапуске второго вызова набивки не происходит никаких данных, даже если он не прочитал все данные, записанные первым вызовом.

#include <stdio.h>
#include <fcntl.h>
#include <time.h>
#include <stdlib.h>
#include <unistd.h>

#define BUFFER_SIZE 100000
char buffer[BUFFER_SIZE];
int elements=0;
int main(int argc, char **argv)
{
  int total_read=0, total_write=0;
  FILE *logfile=fopen(argv[1],"w");

  int flags = fcntl(fileno(stdin), F_GETFL, 0);
  fcntl(fileno(stdin), F_SETFL, flags | O_NONBLOCK);
  flags = fcntl(fileno(stdout), F_GETFL, 0);
  fcntl(fileno(stdout), F_SETFL, flags | O_NONBLOCK);

  while (1) {
    int num_read=0;
    if (elements < (BUFFER_SIZE-1024)) { // space in buffer
      num_read = fread(&buffer[elements], sizeof(char), 1024, stdin);
      elements += num_read;
      total_read += num_read;
      fprintf(logfile,"read %d (%d) elements \n",num_read, total_read); fflush(logfile);
    }
    if (elements > 0) { // something in buffer that we can write
      int num_written = fwrite(&buffer[0],sizeof(char),elements, stdout); fflush(stdout);
      total_write += num_written;
      fprintf(logfile,"wrote %d (%d) elements \n",num_written, total_write); fflush(logfile);
      if (num_written > 0) { // copy data to top of buffer
        for (int i=0; i<(elements-num_written); i++) {
          buffer[i] = buffer[i+num_written];
        }
        elements -= num_written;
      }
    }
  }
}

Я полагаю,мог бы сделать прокладку многопоточным и использовать блокировку чтения в одном потоке и блокировку записи в другом, но я хотел бы понять, почему неблокирующая операция ввода-вывода кажется мне сломанной.

Спасибо!

1 Ответ

0 голосов
/ 04 января 2019

Мое общее решение для любого проекта IPC - сделать неблокирующий ввод-вывод клиента и сервера.Для этого требуется поместить данные в очередь как при записи, так и при чтении, чтобы обрабатывать случаи, когда ОС не может читать / писать или может только читать / записывать часть вашего сообщения.

Код ниже, вероятно, будет выглядетьЧРЕЗВЫЧАЙНО излишне, но если вы заставите его работать, вы можете использовать его до конца своей карьеры, будь то для именованных каналов, сокетов, сети, назовите его.

В псевдокоде:

typedef struct {
  const char* pcData, * pcToFree; // pcData may no longer point to malloc'd region
  int   iToSend;
} DataToSend_T;

queue of DataToSend_T qdts;

// Caller will use malloc() to allocate storage, and create the message in
// that buffer.  MyWrite() will free it now, or WritableCB() will free it
// later.  Either way, the app must NOT free it, and must not even refer to
// it again.

MyWrite( const char* pcData, int iToSend ) {
  iSent = 0;

  // Normally the OS will tell select() if the socket is writable, but if were hugely
  // compute-bound, then it won't have a chance to.  So let's call WritableCB() to
  // send anything in our queue that is now sendable.  We have to send the data in
  // order, of course, so can't send the new data until the entire queue is done.
  WritableCB();

  if ( qdts has no entries ) {
     iSent = write( pcData, iToSend );
      // TODO: check error
      // Did we send it all?  We're done.
      if ( iSent == iToSend ) {
          free( pcData );
          return;
      }
  }

  // OK, either 1) we had stuff queued already meaning we can't send, or 2)
  // we tried to send but couldn't send it all.
  add to queue qdts the DataToSend ( pcData + iSent, pcData, iToSend - iSent );
}



WritableCB() {
  while ( qdts has entries ) {
      DataToSend_T* pdts = qdts head;
      int iSent = write( pdts->cData, pdts->iToSend );
      // TODO: check error
      if ( iSent == pdts->iToSend ) {
          free( pdts->pcToFree );
          pop the front node off qdts
      else {
          pdts->pcData  += iSent;
          pdts->iToSend -= iSent;
          return;   
      }
  }
}



// Off-subject but I like a TINY buffer as an original value, that will always
// exercise the "buffer growth" code for almost all usage, so we're sure it works.
// If the initial buffer size is like 1M, and almost never grows, then the grow code
// may be buggy and we won't know until there's a crash years later.

int iBufSize = 1, iEnd = 0;  iEnd is the first byte NOT in a message
char* pcBuf = malloc( iBufSize );

ReadableCB() {
  // Keep reading the socket until there's no more data.  Grow buffer if necessary.
  while (1) {
      int iRead = read( pcBuf + iEnd, iBufSize - iEnd);
      // TODO: check error
      iEnd += iRead;

      // If we read less than we had space for, then read returned because this is
      // all the available data, not because the buffer was too small.
      if ( iRead < iBufSize - iEnd )
          break;

      // Otherwise, double the buffer and try reading some more.
      iBufSize *= 2;
      pcBuf = realloc( pcBuf, iBufSize );
  }

  iStart = 0;
  while (1) {
      if ( pcBuf[ iStart ] until iEnd-1 is less than a message ) {
          // If our partial message isn't at the front of the buffer move it there.
          if ( iStart ) {
              memmove( pcBuf, pcBuf + iStart, iEnd - iStart );
              iEnd -= iStart;
          }
          return;
      }
      // process a message, and advance iStart by the size of that message.
  }
}



main() {
  // Do your initial processing, and call MyWrite() to send and/or queue data.

  while (1) {
       select() // see man page
       if ( the file handle is readable )
           ReadableCB();
       if ( the file handle is writable )
           WritableCB();
       if ( the file handle is in error )
           // handle it;
       if ( application is finished )
           exit( EXIT_SUCCESS );
  }
}
...