Тупик при использовании fork, exec и pipe в параллельной среде - PullRequest
0 голосов
/ 27 мая 2019

Я порождаю дочерний процесс, используя fork и exec.Использование двух каналов для обеспечения ввода и получения вывода из этого процесса.

В большинстве случаев он работает просто отлично, но когда я использую что-то вроде openmp, чтобы проверить, как оно работает в параллельных средах, оно зависает в системном вызове readили waitpid иногда.

Когда я strace редактировал дочерний процесс, я обнаружил, что он также заблокирован в системном вызове read.Что странно, потому что я жду чтения в родительском процессе только после того, как я предоставил весь свой ввод и закрыл конец записи канала.

Я попытался создать MVCE, ноэто вроде долго.Я не знаю, как сделать это немного короче.Ради простоты я удалил большую часть кода проверки ошибок.

Обратите внимание, что в моем коде нет глобальных переменных.И я не пытаюсь читать / писать из одних и тех же файловых дескрипторов в нескольких потоках.

Я не могу думать о том, что может пойти не так.Так что, надеюсь, вы, ребята, поймете, что я делаю не так.

Вот так:

#include <string.h>
#include <assert.h>
#include <unistd.h>
#include <limits.h>
#include <sys/wait.h>
#include <stdio.h>
#include <stdlib.h>

size_t
min(size_t first, size_t second)
{
    if(first < second)
    {
        return first;
    }

    return second;
}

struct RDI_Buffer
{
    char* data;
    size_t size;
};

typedef struct RDI_Buffer RDI_Buffer;

RDI_Buffer
rdi_buffer_init()
{
    RDI_Buffer b = {0};
    return b;
}

RDI_Buffer
rdi_buffer_new(size_t size)
{
    RDI_Buffer b;

    b.data = malloc(size);
    b.size = size;
    return b;
}

void
rdi_buffer_free(RDI_Buffer b)
{
    if(!b.data)
    {
        return;
    }

    free(b.data);
}

RDI_Buffer
rdi_buffer_resize(RDI_Buffer b, size_t new_size)
{
    if(!b.data)
    {
        return rdi_buffer_new(new_size);
    }

    char* new_data = realloc(b.data, new_size);

    if(new_data)
    {
        b.size = new_size;
        b.data = new_data;
        return b;
    }

    RDI_Buffer output = rdi_buffer_new(new_size);
    memcpy(output.data, b.data, output.size);
    rdi_buffer_free(b);
    return output;
}

RDI_Buffer
rdi_buffer_null_terminate(RDI_Buffer b)
{
    b = rdi_buffer_resize(b, b.size + 1);
    b.data[b.size - 1] = '\0';
    return b;
}

static RDI_Buffer
rw_from_fd(int w_fd, int r_fd, RDI_Buffer input)
{
    const size_t CHUNK_SIZE = 4096;

    assert(input.size <= CHUNK_SIZE);

    write(w_fd, input.data, input.size);
    close(w_fd);

    RDI_Buffer output = rdi_buffer_new(CHUNK_SIZE);

    read(r_fd, output.data, CHUNK_SIZE);

    close(r_fd);
    return output;
}

int main()
{
#pragma omp parallel for
    for(size_t i = 0; i < 100; i++)
    {
        char* thing =
                "Hello this is a sort of long text so that we can test how "
                "well this works. It should go with cat and be printed.";

        RDI_Buffer input_buffer;
        input_buffer.data = thing;
        input_buffer.size = strlen(thing);

        int main_to_sub[2];
        int sub_to_main[2];

        pipe(main_to_sub);
        pipe(sub_to_main);

        int pid = fork();

        if(pid == 0)
        {
            dup2(main_to_sub[0], STDIN_FILENO);
            dup2(sub_to_main[1], STDOUT_FILENO);

            close(main_to_sub[1]);
            close(main_to_sub[0]);
            close(sub_to_main[1]);
            close(sub_to_main[0]);

            char* argv[] = {"cat", NULL};

            execvp("cat", argv);
            exit(1);
        }

        close(main_to_sub[0]);
        close(sub_to_main[1]);

        RDI_Buffer output =
                rw_from_fd(main_to_sub[1], sub_to_main[0], input_buffer);

        int *status = NULL;
        waitpid(pid, status, 0);

        if(status)
        {
            printf("%d\n", *status);
        }

        output = rdi_buffer_null_terminate(output);

        if(strcmp(output.data, thing) == 0)
        {
            printf("good\n");
        }
        else
        {
            printf("bad\n");
        }

        rdi_buffer_free(output);
    }
}

Убедитесь, что вы компилируете и связываетесь с -fopenmp.Вот так: gcc main.c -fopenmp

Ответы [ 3 ]

2 голосов
/ 29 мая 2019

Когда ваш основной сервер завис, введите lsof в отдельном сеансе. Я думаю, вы увидите что-то вроде:

....
cat       5323                 steve  txt       REG              252,0    52080    6553613 /bin/cat
cat       5323                 steve  mem       REG              252,0  1868984   17302005 /lib/x86_64-linux-gnu/libc-2.23.so
cat       5323                 steve  mem       REG              252,0   162632   17301981 /lib/x86_64-linux-gnu/ld-2.23.so
cat       5323                 steve  mem       REG              252,0  1668976   12849924 /usr/lib/locale/locale-archive
cat       5323                 steve    0r     FIFO               0,10      0t0      32079 pipe
cat       5323                 steve    1w     FIFO               0,10      0t0      32080 pipe
cat       5323                 steve    2u      CHR              136,0      0t0          3 /dev/pts/0
cat       5323                 steve    3r     FIFO               0,10      0t0      32889 pipe
cat       5323                 steve    4w     FIFO               0,10      0t0      32889 pipe
cat       5323                 steve    6r     FIFO               0,10      0t0      32890 pipe
cat       5323                 steve    7r     FIFO               0,10      0t0      34359 pipe
cat       5323                 steve    8w     FIFO               0,10      0t0      32890 pipe
cat       5323                 steve   10r     FIFO               0,10      0t0      22504 pipe
cat       5323                 steve   15w     FIFO               0,10      0t0      22504 pipe
cat       5323                 steve   16r     FIFO               0,10      0t0      22505 pipe
cat       5323                 steve   31w     FIFO               0,10      0t0      22505 pipe
cat       5323                 steve   35r     FIFO               0,10      0t0      17257 pipe
cat       5323                 steve   47r     FIFO               0,10      0t0      31304 pipe
cat       5323                 steve   49r     FIFO               0,10      0t0      30264 pipe

, что поднимает вопрос, откуда все эти трубы? Ваш основной цикл больше не является одним циклом, это набор несинхронизированных параллельных циклов. Посмотрите на шаблон ниже:

void *tdispatch(void *p) {
      int to[2], from[2];
      pipe(to);
      pipe(from);
      if (fork() == 0) {
          ...
      } else {
          ...
          pthread_exit(0); 
     }
}
...
for (int i = 0; i < NCPU; i++) {
    pthread_create(..., tdispatch, ...);
}
for (int i = 0; i < NCPU; i++) {
    pthread_join(...);
}

Несколько экземпляров tdispatch могут чередовать вызовы pipe (to), pipe (from) и fork (); таким образом, fds просачивается в эти процессы. Я говорю об утечке, потому что процесс разветвленного не имеет представления о том, что они там есть.

Канал продолжает отвечать на системные вызовы read (), пока у него либо есть буферизованные данные, либо открыт хотя бы один дескриптор файла записи.

Предположим, что у процесса 5 нормальные два конца двух открытых труб, указывающие на трубу № 10 и трубу № 11; и процесс 6 имеет трубу № 12 и трубу № 13. Но из-за утечки выше процесс 5 также имеет конец записи канала # 12, а процесс 6 имеет конец записи канала # 10. Процесс 5 и 6 никогда не завершится, потому что они поддерживают открытые каналы чтения друг друга.

Решение в значительной степени соответствует тому, о чем говорили ранее: нити и вилки - сложная комбинация. Чтобы заставить его работать, вам нужно было бы сериализовать биты pipe, fork, initial-close.

0 голосов
/ 29 мая 2019

Причиной проблемы, как оказалось, являются открытые файлы, которые наследуются дочерним процессам, как объяснили Джонатан Леффлер и Мевет в своих ответах.Пожалуйста, прочитайте их ответы, если у вас возникла эта проблема, а затем вернитесь к моему ответу, если вы все еще не понимаете или не знаете, что делать.

Я поделюсь своим объяснением так, как я бы сразу понял,Также поделитесь решением моего кода для решения этой проблемы.

Рассмотрим следующий сценарий: Процесс A открывает канал (который состоит из двух файлов).

Процесс A порождает процесс B для связи с ним через канал,Однако он также создает процесс C, который наследует канал (два файла).

Теперь процесс B будет непрерывно вызывать read(2) в канале, который является системным вызовом блокировки.(Он будет ждать, пока кто-нибудь запишет в канал)

Процесс A заканчивает запись и закрывает свой конец канала.Обычно это приводит к сбою системного вызова read(2) в процессе B, и программа завершает работу (это то, что мы хотим).

Однако в нашем случае, поскольку процесс C имеет открытый конец записи канала,read(2) Системный вызов в процессе B не завершится сбоем и заблокирует ожидание записи с открытого конца записи в процессе C.

Все будет хорошо, когда процесс C. только завершится.

настоящий тупик возникнет в другом сценарии, где и B, и C удерживают каналы друг для друга (как объяснено в ответе Мевета).Каждый из них будет ждать, пока другой закроет свои концы труб.Что никогда не случится, вызывая тупик.

Мое решение состояло в том, чтобы закрыть все открытые файлы, которые мне не нужны, сразу после fork(2)

int pid = fork();

if(pid == 0)
{
    int exceptions[2] = {main_to_sub[0], sub_to_main[1]};
    close_all_descriptors(exceptions);
    dup2(main_to_sub[0], STDIN_FILENO);
    dup2(sub_to_main[1], STDOUT_FILENO);

    close(main_to_sub[0]);
    close(sub_to_main[1]);

    char* argv[] = {"cat", NULL};

    execvp("cat", argv);
    exit(1);
}

Вот реализация close_all_descriptors

#include <fcntl.h>
#include <errno.h>

static int
is_within(int fd, int arr[2])
{
    for(int i = 0; i < 2; i++)
    {
        if(fd == arr[i])
        {
            return 1;
        }
    }

    return 0;
}

static int
fd_is_valid(int fd)
{
    return fcntl(fd, F_GETFD) != -1 || errno != EBADF;
}

static void
close_all_descriptors(int exceptions[2])
{
    // getdtablesize returns the max number of files that can be open. It's 1024 on my system
    const int max_fd = getdtablesize();

    // starting at 3 because I don't want to close stdin/out/err
    // let dup2(2) do that
    for (int fd = 3; fd <= max_fd; fd++)
    {
        if(fd_is_valid(fd) && !is_within(fd, exceptions))
        {
            close(fd);
        }
    }
}
0 голосов
/ 27 мая 2019

Преобразование комментариев в ответ.

Возможно, вам не хватает файловых дескрипторов.При параллельности 100 итераций цикла, которые создают 4 файловых дескриптора на каждую итерацию, могут привести к проблемам, если ограничение составляет около 256 дескрипторов.Да, вы закрываете некоторые из них быстро, но достаточно быстро?Это не ясно.А неопределенность планирования легко может объяснить изменяющееся поведение.

Насколько я понимаю, openmp состоит в том, что он входит в тело цикла n раз в то время, когда n - количество потоков (я ошибаюсь?).Поэтому в любой момент времени у меня никогда не должно быть более n * 2 файловых дескрипторов, которые на моем компьютере должны быть около 24.

Вероятно, n * 4 файловых дескриптора, но могут быть ограничения на параллелизм,Я недостаточно знаком с OpenMP, чтобы авторитетно комментировать это.Есть ли прагмы, кроме цикла for, который должен быть установлен?Мне не ясно, что запуск показанного кода привел к параллелизму на Mac, когда код скомпилирован с Clang - который не жалуется на #pragma, в отличие от GCC 9.1.0, который предупреждает о неизвестной прагме под моимпараметры компиляции по умолчанию.

Однако, с форками и execs, а также потоками, жизнь становится хитрой.Файловые дескрипторы могут не закрываться, поэтому их следует закрывать, потому что файловые дескрипторы являются ресурсом уровня процесса, поэтому поток 1 может создавать файловые дескрипторы, о которых поток 2 не знает, но которые он разделяет.И затем, когда поток 2 разветвляется, дескрипторы файлов, созданные потоком 1, не закрываются, что не позволяет cat правильно определить EOF и т. Д.

Один из способов убедиться в этом - использовать такую ​​функцию, как эта:

#include <sys/stat.h>

static void dump_descriptors(int max_fd)
{
    struct stat sb;
    for (int fd = 0; fd <= max_fd; fd++)
        putchar((fstat(fd, &sb) == 0) ? 'o' : '-');
    putchar('\n');
    fflush(stdout);
}

и в дочернем коде назовите его подходящим номером (возможно, 64 - может быть случай использования числа, равного 404).Хотя заманчиво попытаться использовать flockfile(stdout) и funlockfile(stdout) в функции, бессмысленно, если он вызывается только в дочернем процессе, потому что дочерний процесс однопоточный и, следовательно, не будет никакого вмешательства со стороны других потоков впроцесс.Однако слишком возможно, чтобы разные процессы могли мешать выводу друг друга.

Если вы собираетесь использовать dump_descriptor() из потоков родительского процесса, то добавьте flockfile(stdout); перед циклом и funlockfile(stdout); после fflush() звонка.Я не уверен, насколько это будет мешать проблеме;он обеспечивает однопоточное выполнение этой функции, потому что ни один из других потоков не может записывать в stdout, пока один поток заблокирован.

Однако, когда я тестировал его с немного измененной версией кода, который выводит PIDдо «хороших» и «плохих» строк и до вывода dump_descriptors() я никогда не видел чередования операций.Я получил вывод вроде:

14128: ooooooo----------------------------------------------------------
14128: good
14129: ooooooo----------------------------------------------------------
14129: good
14130: ooooooo----------------------------------------------------------
14130: good
…
14225: ooooooo----------------------------------------------------------
14225: good
14226: ooooooo----------------------------------------------------------
14226: good
14227: ooooooo----------------------------------------------------------
14227: good

, который настоятельно указывает на отсутствие параллелизма в коде.И когда нет параллелизма, вы не увидите проблемы.Каждый раз есть 4 дескриптора для каналов, и код тщательно закрывает их.

Подумайте о перенаправлении карты дескрипторов в файл (или один файл на каждого ребенка) в вашем сценарии, где вы, возможно, становитесь серьезнымипараллелизм.

Обратите внимание, что смешивание нитей с fork() по своей сути сложно (как Джон Боллинджер отметил ) - обычно вы используете один или другой механизм, а не оба.

...