чтение из трубы в c - PullRequest
       3

чтение из трубы в c

0 голосов
/ 05 октября 2011

Сценарий: скажем, у меня есть 8 файлов, в которые я хочу отсортировать все данные чисел в порядке от наименьшего к наибольшему. Только листовые процессы могут сортировать все числа, содержащиеся в файле. Эти конечные процессы должны отправлять отсортированные данные в родительский процесс через каналы. Этот родительский процесс будет сравнивать полученные и отправленные данные, число которых будет меньше для следующего процесса. Это будет происходить до тех пор, пока все числа в трубе не станут пустыми.

Так что думайте об этом как о дереве. У нас есть один основной процесс. При сортировке 8 файлов мастер-процесс порождает 2 процесса (левый и правый). Эти два новых процесса будут порождать их собственные процессы. Это будет происходить до тех пор, пока внизу не будет 8 листовых процессов. Внутренние узлы могут содержать только один номер. Они будут передавать свои номера по ряду каналов, пока не достигнут мастер-процесса. Главный процесс выведет свое содержимое в файл.

Я включил код здесь (поскольку он немного длинный, но простой).

Это работает, если у меня есть 2 файла для сортировки. Итак, у нас есть 1 мастер-процесс, а затем двое детей. Двое детей сортируют номера своих файлов и затем передают их. Затем основной процесс выводит данные по порядку из каналов. Однако, если я добавлю некоторую сложность (4 файла), конечные процессы все еще отправляют свои данные вверх, однако, когда главный процесс начинает читать из внутренних каналов каналов, он думает, что он пуст и завершает программу без каких-либо данных.

Есть идеи, почему главный процесс думает, что его левая и правая трубы пусты? Как я уже сказал, прекрасно работает, когда есть один родитель и 2 детей. Больше обрабатывает, и это терпит неудачу. (при условии, что обработка произойдет в степени 2).

ПРИМЕЧАНИЕ: perror используется для целей отладки.

полная программа здесь [очень грязно, так как я много с ней делал, но она скомпилируется.

1 Ответ

1 голос
/ 05 октября 2011

Обновленный код в Pastebin не является компилируемой функцией - не говоря уже о полной программе. Из-за этого трудно сказать, что на самом деле не так.

Однако одна из проблем заключается во фрагменте кода:

if (pipe(upPipe) < 0 || pipe(leftPipe) < 0 || pipe(rightPipe) < 0)
    ...error exit...
if ((leftPID = fork()) < 0)
    ...error exit...

if(leftPID == 0){
    fMax = ((fMax)/2);
    dup2(leftPipe[WRITE], upPipe[WRITE]);
    pipe(leftPipe);
    pipe(rightPipe);

Звонок на dup2() странный; вы тщательно сопоставляете канал записи левого канала с каналом записи восходящего канала.

Два pipe() вызова после dup2() довольно быстро испортили все в левом дочернем элементе, открыв еще 4 дескриптора файла, но потеряв предыдущие значения, сохраненные в leftPipe и rightPipe.

Вы должны сделать вашу формулировку проблемы более понятной. Я не могу понять, что у вас есть то, что вы должны иметь. Есть вызов convertToInt(), который не принимает аргументов и не возвращает значения; что на земле это делает? Есть звонок на freeMem(); не ясно, что это делает.

z.c:42: error: ‘numberChar’ undeclared (first use in this function)
z.c:42: error: ‘sizeNumbers’ undeclared (first use in this function)
z.c:43: warning: implicit declaration of function ‘readFile’
z.c:43: error: ‘fileNames’ undeclared (first use in this function)
z.c:45: warning: implicit declaration of function ‘convertToInt’
z.c:46: error: ‘i’ undeclared (first use in this function)
z.c:46: error: ‘numbs’ undeclared (first use in this function)
z.c:47: error: ‘numbers’ undeclared (first use in this function)
z.c:48: warning: implicit declaration of function ‘freeMem’

Извините, ваш вопрос не отвечает, потому что вы не даете нам:

  1. Точные требования.
  2. Код, который вы на самом деле получили.

В вашем коде нет четкого разделения функций. Используете ли вы VCS (система контроля версий - например, git)? Если нет, вы должны. Я сделал измененную версию ниже - которая, по сути, является полной перепиской - за 9 проверок, и, вероятно, должна была сделать более мелкие проверки, чем эта. Но использование VCS было крайне важно для меня; это позволило мне с уверенностью вносить изменения, зная, что я не потеряю ничего ценного. И мне не нужно было комментировать код; Я удалил вещи, которые я не хотел. Решение ниже - 261 строка; оригинал содержал около 687 строк, включая много закомментированного кода; когда я закончил вычеркивать комментарии и т. д., он сократился до 469 строк.

Когда я запустил ваш код (и сообщил о том, какие файлы открываются каждым дочерним элементом), я обнаружил, что два файла открывают каждый из файлов 2 и 3 (и поскольку файлов данных в то время не существовало) в этот момент они потерпели неудачу).

Пересмотренный код имеет почти чистую структуру; нечетный бит - это фаза convertToString (), которая считывает двоичные целые числа из канала и снова конвертирует их в вывод ASCII. Оно работает; Я не уверен, что это элегантно. Вместо использования массива жестко закодированных имен файлов, он берет произвольный список имен файлов из командной строки (он не должен быть 8; он был протестирован с 0 по 8, и у меня нет причин думать он не справится с 20 или более). Я провел достаточное количество испытаний с:

./piped-merge-sort [1-8]

Имеется много диагностических данных. Я использовал две функции, которые мне очень помогли в моей работе - они упакованы вместе с другим связанным кодом в более сложный пакет, но простые версии функций err_error() и err_remark() действительно помогают мне. Обратите внимание, что эти версии сообщают PID процесса отчетности для каждого вызова. Они также осторожны, чтобы предварительно отформатировать сообщение в строку, а затем записать строку за один раз до стандартной ошибки; в противном случае я получал много чередующегося вывода, которое в лучшем случае сбивало с толку.

'Нуфф сказал - вот код:

#include <assert.h>
#include <errno.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/wait.h>
#include <unistd.h>

/* Error reporting */
static void err_vremark(char *fmt, va_list args)
{
    char buffer[256];
    int errnum = errno;
    int buflen = snprintf(buffer, sizeof(buffer), "%d: ", (int)getpid());
    buflen += vsnprintf(buffer + buflen, sizeof(buffer) - buflen, fmt, args);
    if (errnum != 0)
        buflen += snprintf(buffer + buflen, sizeof(buffer) - buflen,
                           ": errno = %d (%s)", errnum, strerror(errnum));
    fprintf(stderr, "%s\n", buffer);
}

static void err_error(char *fmt, ...)
{
    va_list args;
    va_start(args, fmt);
    err_vremark(fmt, args);
    va_end(args);
    exit(1);
}

static void err_remark(char *fmt, ...)
{
    va_list args;
    va_start(args, fmt);
    err_vremark(fmt, args);
    va_end(args);
}

enum { READ = 0 };
enum { WRITE = 1 };
enum { BUFFER_SIZE = 256 };

static int *a_data = 0;
static int  a_used = 0;
static int  a_size = 0;

void readFile(char const *fileName);
void freeMem(void);
void sortArray(void);
int  intcmp(void const *n1, void const *n2);

static void sortMergeFiles(int fd, int number, char **names);
static void sortOneFile(int fd, const char *file);
static void convertToString(int fd, FILE *fp);

int main(int argc, char **argv)
{
    int m_pipe[2];
    pid_t pid;
    if (pipe(m_pipe) < 0)
        err_error("Failed to create master pipe");
    if ((pid = fork()) < 0)
        err_error("Failed to fork master");
    else if (pid == 0)
    {
        close(m_pipe[READ]);
        sortMergeFiles(m_pipe[WRITE], argc - 1, &argv[1]);
        close(m_pipe[WRITE]);
    }
    else
    {
        close(m_pipe[WRITE]);
        convertToString(m_pipe[READ], stdout);
        close(m_pipe[READ]);
    }
    return 0;
}

static void convertToString(int fd, FILE *fp)
{
    int value;
    while (read(fd, &value, sizeof(int)) == sizeof(int))
        fprintf(fp, "%d\n", value);
}

static int readInteger(int fd, int *value)
{
    if (read(fd, value, sizeof(int)) != sizeof(int))
        return EOF;
    return 0;
}

static void writeInteger(int fd, int value)
{
    if (write(fd, &value, sizeof(int)) != sizeof(int))
        err_error("Failed to write integer to fd %d", fd);
}

static void mergeFiles(int fd_in1, int fd_in2, int fd_out)
{
    int value_1;
    int value_2;
    int eof_1 = readInteger(fd_in1, &value_1);
    int eof_2 = readInteger(fd_in2, &value_2);

    while (eof_1 != EOF && eof_2 != EOF)
    {
        err_remark("v1: %d; v2: %d", value_1, value_2);
        if (value_1 <= value_2)
        {
            writeInteger(fd_out, value_1);
            eof_1 = readInteger(fd_in1, &value_1);
        }
        else
        {
            writeInteger(fd_out, value_2);
            eof_2 = readInteger(fd_in2, &value_2);
        }
    }

    while (eof_1 != EOF)
    {
        err_remark("v1: %d", value_1);
        writeInteger(fd_out, value_1);
        eof_1 = readInteger(fd_in1, &value_1);
    }

    while (eof_2 != EOF)
    {
        err_remark("v2: %d", value_2);
        writeInteger(fd_out, value_2);
        eof_2 = readInteger(fd_in2, &value_2);
    }
}

static void sortMergeFiles(int fd, int number, char **names)
{
    assert(number >= 0);
    if (number == 0)
        return;
    else if (number == 1)
        sortOneFile(fd, names[0]);
    else
    {
        err_remark("Non-Leaf: processing %d files (%s .. %s)", number, names[0], names[number-1]);
        int mid = number / 2;
        int l_pipe[2];
        int r_pipe[2];
        pid_t l_pid;
        pid_t r_pid;
        if (pipe(l_pipe) < 0 || pipe(r_pipe) < 0)
            err_error("Failed to create pipes");
        if ((l_pid = fork()) < 0)
            err_error("Failed to fork left child");
        else if (l_pid == 0)
        {
            close(l_pipe[READ]);
            close(r_pipe[READ]);
            close(r_pipe[WRITE]);
            sortMergeFiles(l_pipe[WRITE], mid, names);
            close(l_pipe[WRITE]);
            exit(0);
        }
        else if ((r_pid = fork()) < 0)
            err_error("Failed to fork right child");
        else if (r_pid == 0)
        {
            close(r_pipe[READ]);
            close(l_pipe[READ]);
            close(l_pipe[WRITE]);
            sortMergeFiles(r_pipe[WRITE], number - mid, names + mid);
            close(r_pipe[WRITE]);
            exit(0);
        }
        else
        {
            close(l_pipe[WRITE]);
            close(r_pipe[WRITE]);
            mergeFiles(l_pipe[READ], r_pipe[READ], fd);
            close(l_pipe[READ]);
            close(r_pipe[READ]);
            err_remark("Non-Leaf: finished %d files (%s .. %s)", number, names[0], names[number-1]);
        }
    }
}

static void addNumberToArray(int number)
{
    assert(a_used >= 0 && a_used <= a_size);
    if (a_used == a_size)
    {
        int  n_size = (a_size + 1) * 2;
        int *n_data = realloc(a_data, sizeof(*n_data) * n_size);
        if (n_data == 0)
            err_error("Failed to allocate space for %d numbers", n_size);
        a_data = n_data;
        a_size = n_size;
    }
    a_data[a_used++] = number;
}

/* Could be compressed to write(fd, a_data, a_used * sizeof(int)); */
/* Arguably should check for write errors - but not SIGPIPE */
static void writeArray(int fd)
{
    for (int i = 0; i < a_used; i++)
    {
        err_remark("Write: %d", a_data[i]);
        write(fd, &a_data[i], sizeof(int));
    }
}

void readFile(char const *fileName)
{
    char buffer[BUFFER_SIZE];
    FILE *fp;
    fp = fopen(fileName, "r");

    if (fp == NULL)
        err_error("Failed to open file %s for reading", fileName);

    while (fgets(buffer, sizeof(buffer), fp) != NULL)
    {
        char *nl = strchr(buffer, '\n');
        if (nl != 0)
            *nl = '\0';
        err_remark("Line: %s", buffer);
        addNumberToArray(atoi(buffer));
    }

    fclose(fp);
}

int intcmp(const void *n1, const void *n2)
{
    const int num1 = *(const int *) n1;
    const int num2 = *(const int *) n2;
    return (num1 < num2) ? -1 : (num1 > num2);
}

void sortArray(void)
{
    qsort(a_data, a_used, sizeof(int), intcmp);
}

void freeMem(void)
{
    free(a_data);
}

static void sortOneFile(int fd, const char *file)
{
    err_remark("Leaf: processing file %s", file);
    readFile(file);
    sortArray();
    writeArray(fd);
    freeMem();
    err_remark("Leaf: finished file %s", file);
}
...