Системный вызов Linux epoll, ожидающий данных - PullRequest
1 голос
/ 13 июня 2019

Тестирование системного вызова Linux epoll с использованием простой программы родитель-потомок.

Ожидаемое поведение

Поскольку дочерний элемент пишет no каждую секунду, родитель должен читать его из канала и записывать no каждую секунду в стандартный вывод.

Фактическое поведение

Родитель ожидает, пока ребенок записывает все nos, а затем читает все данные из канала и записывает в стандартный вывод. Проверено, выполняя strace на родителя. Блокируется в epoll_wait.

пожалуйста, проверьте README в github для получения дополнительной информации

Родитель

#define _GNU_SOURCE
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <error.h>
#include <errno.h>
#include <string.h>
#include <sys/wait.h>
#include <sys/epoll.h>

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#define NAMED_FIFO "aFifo"

static void set_nonblocking(int fd) {
  int flags = fcntl(fd, F_GETFL, 0);
  if (flags == -1) {
    perror("fcntl()");
    return;
  }
  if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
    perror("fcntl()");
  }
}

void errExit(char *msg) {
  perror(msg);
  exit(-1);
}

void printArgs(char **argv,char **env) {
  for(int i=0;argv[i];i++)
    printf("argv[%d]=%s\n",i,argv[i]);

  for(int i=0;env[i];i++)
    printf("env[%d]=%s\n",i,env[i]);
}

void PrintNos(short int max,char *name) {
  int fifo_fd,rVal;
  int bSize=2;
  char buffer[bSize];

  fifo_fd = open(NAMED_FIFO,O_RDONLY);
  if(fifo_fd<0)
    errExit("open");

  for(short int i=0;i<max;i++) {
    rVal = read(fifo_fd,buffer,bSize);
    if(rVal != bSize)
      errExit("read");
    printf("%03d\n",i);
  }
}

int main(int argc, char *argv[],char *env[]) {
  //int pipe_fds_child_stdin[2] ;
  int pipe_fds_child_stdout[2] ;
  pid_t child_id ;

  //if( pipe(pipe_fds_child_stdin) < 0 )
  //  errExit("pipe");

  if( pipe(pipe_fds_child_stdout) < 0 )
    errExit("pipe");

  child_id = fork();

  if( child_id > 0 ) {
    const int MAX_POLL_FDS = 2;
    const int BUF_SIZE = 4;

    size_t readSize;
    char buf[BUF_SIZE];
    int status;

    int epoll_fd;
    int nfds ;
    struct epoll_event e_e, e_events[MAX_POLL_FDS];

    memset(e_events,'\0',sizeof(e_events));
    memset(&e_e,'\0',sizeof(e_e));
    //close(pipe_fds_child_stdin[0]);
    close(pipe_fds_child_stdout[1]);

    epoll_fd = epoll_create1(0);
    if(epoll_fd < 0)
      errExit("epoll_create1");

    e_e.data.fd = pipe_fds_child_stdout[0];
    e_e.events  = EPOLLIN;

    if(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, pipe_fds_child_stdout[0], &e_e) < 0)
      errExit("epoll_ctl");

    while(1) {
      nfds = epoll_wait(epoll_fd, e_events,MAX_POLL_FDS,-1);
      if( nfds < 0)
        errExit("epoll_wait");

      for(int i=0;i<nfds;i++) {
        if( e_events[i].data.fd == pipe_fds_child_stdout[0]) {
          if( e_events[i].events & EPOLLIN) {
            readSize = read(pipe_fds_child_stdout[0],buf,BUF_SIZE);
            if( readSize == BUF_SIZE ) {
              write(STDOUT_FILENO,buf,BUF_SIZE);
            } else if(readSize == 0) { // eof
              errExit("readSize=0");
            } else {
              errExit("read");
            }
          } else if( e_events[i].events & EPOLLHUP) {
            printf("got EPOLLHUP on pipefd\n");
            wait(&status);
            exit(0);
          } else {
            errExit("Unexpected event flag returned by epoll_wait on waited fd");
          }
        } else  {
          errExit("epoll_wait returned non-awaited fd");
        }
      }
    }
  } else if( child_id == 0 ) {
    close(0);
    close(1);
    //close(pipe_fds_child_stdin[1]);
    close(pipe_fds_child_stdout[0]);

    //dup2(pipe_fds_child_stdin[0],0);
    dup2(pipe_fds_child_stdout[1],1);

    execvpe(argv[1],&(argv[1]),env);
    //PrintNos(100,"P");
    //errExit("execvp");
  } else {
    errExit("fork");
  }
}

Дети

import sys
import time
import os
#f=open("aFifo",'r')
for x in range(10):
    #try:
    #    val = f.read(2)
    #except Exception as e:
    #    raise 
    time.sleep(1)
    print(f'{x:03d}')

1 Ответ

0 голосов
/ 14 июня 2019

Это связано с буферизацией Python, которую можно отключить, передав параметр -u в python.


После долгих поисков и исследований понял, что это из-за конвейерного буфера. Хотя клиент пишет, он находится в буфере канала. Только после того, как буфер канала заполнен, ядро ​​отправляет событие ready для этого дескриптора. Минимальный размер страницы, ядро ​​не позволяет установить ниже этого. Но это может быть увеличено. Добрался до этого, переключившись с epoll на опрос / выбор. После изменения опроса / выбора поведение было таким же. Блокировка данных была доступна в конвейере.

import fcntl
import os

F_SETPIPE_SZ=1031
fds = os.pipe()
for i in range(5):
    print(fcntl.fcntl(fds[0],F_SETPIPE_SZ,64))


$ python3.7 pipePageSize.py 
4096
4096

Это модифицированный клиент. Соответствующие изменения на сервере тоже.

import time

pageSize=1024*8

for x in range(100):
    time.sleep(0.5)
    print(f'{x:-{pageSize}d}')
...