Как читать один и тот же файл побайтно асинхронно из нескольких потоков? - PullRequest
0 голосов
/ 20 декабря 2018

Я пытаюсь прочитать файл с байтом aio.h, используя aio_read с несколькими потоками.Но я не знаю, нахожусь ли я на правильном пути, поскольку в Интернете не так много вещей, которые можно читать.

Я только что создал функцию worker, чтобы передать ее своим потокам.А также в качестве аргумента для передачи в поток я создал структуру с именем thread_arguments и передал в нее несколько необходимых аргументов, которые понадобятся для открытия aiocb, например offset, file_path, buffer size и priority.

Я могу прочитать файл с одним потоком от начала до конца успешно.Но когда дело доходит до чтения файла в виде фрагментов из нескольких потоков, я не могу это сделать.И я даже не уверен, смогу ли я сделать это с aio->reqprio без использования семафоров или мьютексов.(Попытка открыть файл из нескольких потоков одновременно?)

Как я могу асинхронно прочитать несколько байтов из нескольких потоков?

Допустим, файл содержит «foobarquax», и у нас есть три потока, использующие библиотеку aio.

Затем первый должен прочитать «foo»,

, второй должен прочитать «bar"и

последний должен читать" quax "асинхронно.

Здесь вы можете увидеть скриншоты проблем, связанных с его работой с несколькими потоками

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <aio.h>
#include <string.h>
#include <fcntl.h> // open -> file descriptor O_RDONLY, O_WRONLY, O_RDWR
#include <errno.h>
#include <unistd.h>

typedef struct thread_args {
    char *source_path;
    char *destination_path;
    long int buffer_size;
    long int buffer_size_last; // buffer size for the last thread in case there is a remainder.
    long int offset;
    int priority;
} t_args;


void *worker(void *thread_args) {


    t_args *arguments = (t_args *) thread_args;


    struct aiocb *aiocbRead;

    aiocbRead = calloc(1, sizeof(struct aiocb));

    aiocbRead->aio_fildes = open(arguments->source_path, O_RDONLY);

    if (aiocbRead->aio_fildes == -1) {
        printf("Error");

    }

    printf("opened on descriptor %d\n", aiocbRead->aio_fildes);

    aiocbRead->aio_buf = malloc(sizeof(arguments->buffer_size));
    aiocbRead->aio_offset = arguments->offset;
    aiocbRead->aio_nbytes = (size_t) arguments->buffer_size;
    aiocbRead->aio_reqprio = arguments->priority;

    int s = aio_read(aiocbRead);

    if (s == -1) {
        printf("There was an error");
    }

    while (aio_error(aiocbRead) == EINPROGRESS) {}

    printf("Bytes read %ld", aio_return(aiocbRead));

    close(aiocbRead->aio_fildes);

    for (int i = 0; i < arguments->buffer_size ; ++i) {
        printf("%c\n", (*((char *) aiocbRead->aio_buf + i)));
    }
}

// Returns a random alphabetic character
char getrandomChar() {


    int letterTypeFlag = (rand() % 2);

    if (letterTypeFlag)
        return (char) ('a' + (rand() % 26));
    else
        return (char) ('A' + (rand() % 26));
}

void createRandomFile(char *source, int numberofBytes) {

    FILE *fp = fopen(source, "w");

    for (int i = 0; i < numberofBytes; i++) {
        fprintf(fp, "%c", getrandomChar());
    }

    fclose(fp);

}

int main(int argc, char *argv[]) {

    char *source_path = argv[1];
    char *destination_path = argv[2];
    long int nthreads = strtol(argv[3], NULL, 10);


    // Set the seed.
    srand(time(NULL));

    // Get random number of bytes to write to create the random file.
    int numberofBytes = 10 + (rand() % 100000001);

    // Create a random filled file at the source path.
    createRandomFile(source_path, 100);

    // Calculate the payload for each thread.
    long int payload = 100 / nthreads;
    long int payloadLast = payload + 100 % nthreads;

    // Create a thread argument to pass to pthread.
    t_args *thread_arguments = (t_args *) malloc(nthreads * sizeof(t_args));

    for (int l = 0; l < nthreads; ++l) {

        // Set arguments in the struct.
        (&thread_arguments)[l]->source_path = source_path;
        (&thread_arguments)[l]->destination_path = destination_path;
        (&thread_arguments)[l]->buffer_size = payload;
        (&thread_arguments)[l]->buffer_size_last = payloadLast;
        (&thread_arguments)[l]->offset = l * payload;
        (&thread_arguments)[l]->priority = l;

    }


    pthread_t tID[nthreads];

    // Create pthreads.
    for (int i = 0; i < nthreads; ++i) {
        pthread_create(&tID[i], NULL, worker, (void *) &thread_arguments[i]);
    }

    // Wait for pthreads to be done.
    for (int j = 0; j < nthreads; ++j) {
        pthread_join(tID[j], NULL);
    }

    free(thread_arguments);

    return 0;
}

Этот код успешно читает, если я просто вызываю его из одного потока, но не работает, если я использую его для более чем одного потока, что я и хочу.

...