Как и где объявить мьютекс - PullRequest
0 голосов
/ 06 июня 2018

Я новичок в многопоточности и пытаюсь выяснить, почему в этой реализации пула потоков с очередью существует мьютекс (dstrymutex), который определен в файле c, а не как часть структуры threadpool каквсе остальные мьютексы.Для этого есть причина?И в то время как мы находимся в этом, я хотел бы знать правильное место, чтобы объявить взаимные исключения, которые используются так же, как они используются здесь.Большое спасибо!

Вот код: h file:

#ifndef __THREAD_POOL__
#define __THREAD_POOL__

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "osqueue.h"


#define FAILURE -1
#define SUCCESS 0
#define DONT_WAIT_FOR_TASKS 0

typedef struct thread_pool
{
     //The field x is here because a struct without fields
     //doesn't compile. Remove it once you add fields of your own
     int numOfThreads;
     pthread_t* threads;
     struct os_queue* tasksQueue;
     pthread_mutex_t lock;
     pthread_mutex_t queueLock;
     pthread_cond_t notify;
     int stopped;
     int canInsert;
}ThreadPool;

/**
 * creates a thread pool struct.
 * @param numOfThreads number of threads in the thread pool.
 * @return reference to new thread pool struct if succeeded, NULL if failed.
 */
ThreadPool* tpCreate(int numOfThreads);

/**
 * Destroys the thread pool.
 * @param threadPool thread pool
 * @param shouldWaitForTasks 0 - dont wait for tasks in the queue, else - wait for tasks.
 */
void tpDestroy(ThreadPool* threadPool, int shouldWaitForTasks);

/**
 * inserts a task to the tasks queue of the thread pool.
 * @param threadPool thread pool
 * @param computeFunc task
 * @param param argument to the task
 * @return 0- success , -1 - fail
 */
int tpInsertTask(ThreadPool* threadPool, void (*computeFunc) (void *), void* param);

#endif

c file:

#include <fcntl.h>
#include "threadPool.h"
#define STDERR_FD 2
#define SYS_CALL_FAILURE 10

pthread_mutex_t destryLock;

typedef struct task
{
    void (*computeFunc)(void *param);
    void* param;
}Task;

/**
 * prints error in sys call to stderr.
 */
void printErrorInSysCallToSTDERR() {
    char error_msg[] = "Error in system call\n";
    write(STDERR_FD, error_msg, sizeof(error_msg));
}

/**
 * threads function. tasks are taken and executed by the threads in the thread pool from the tasks queue.
 * @param args expected ThreadPool*
 * @return void
 */
void* execute(void* args) {
    ThreadPool* tp = (ThreadPool*)args;
    struct os_queue* taskQueue = tp->tasksQueue;
    printf("New thread was created\n");

    while (!tp->stopped && !(tp->canInsert == 0 && osIsQueueEmpty(taskQueue))) {
        /* Lock must be taken to wait on conditional variable */
        pthread_mutex_lock(&(tp->queueLock));

        /* Wait on condition variable, check for spurious wakeups.
           When returning from pthread_cond_wait(), we own the lock. */
        if((osIsQueueEmpty(taskQueue)) && (!tp->stopped)) {
            printf("Busy\n");
            pthread_cond_wait(&(tp->notify), &(tp->queueLock));
        }
        pthread_mutex_unlock(&(tp->queueLock));

        pthread_mutex_lock(&(tp->lock));
        if (!(osIsQueueEmpty(taskQueue))) {
            // take task from the queue
            Task* task = osDequeue(taskQueue);
            pthread_mutex_unlock(&(tp->lock));
            // execute task
            task->computeFunc(task->param);
            free(task);
        }
        else {
            pthread_mutex_unlock(&(tp->lock));
        }
    }
}

/**
 * creates a thread pool struct.
 * @param numOfThreads number of threads in the thread pool.
 * @return reference to new thread pool struct if succeeded, NULL if failed.
 */
ThreadPool* tpCreate(int numOfThreads) {
    int out = open("output",  O_CREAT | O_TRUNC | O_WRONLY, 0644);
    if (out == -1) {
        printf("Failed to open output file\n");
        printErrorInSysCallToSTDERR();
        exit(SYS_CALL_FAILURE);
    }
    // replace standard output with output file
    if (dup2(out, STDOUT_FILENO) == -1) {
        printf("Failed to operate dup2 for out\n");
        printErrorInSysCallToSTDERR();
        exit(SYS_CALL_FAILURE);
    }

    ThreadPool* tp = (ThreadPool*)malloc(sizeof(ThreadPool));
    if (tp == NULL) {
        printf("Failure: allocate memory for thread pool struct");
        return NULL;
    }
    tp->numOfThreads = numOfThreads;

    tp->threads = (pthread_t*)malloc(sizeof(pthread_t) * tp->numOfThreads);
    if (tp->threads == NULL) {
        printf("Failure: allocate memory for threads array");
        return NULL;
    }

    tp->tasksQueue = osCreateQueue();
    pthread_mutex_init(&(tp->lock), NULL);
    tp->stopped = 0;
    tp->canInsert = 1;

    if (pthread_mutex_init(&(tp->queueLock), NULL) != 0 ||
            pthread_mutex_init(&(tp->queueLock), NULL) != 0 ||
            pthread_cond_init(&(tp->notify), NULL) != 0) {
        printf("Failure: initialize one required mutex or more\n");
        tpDestroy(tp, 0);
        return NULL;
    }

    int i;
    for (i = 0; i < tp->numOfThreads; i++) {
         if(pthread_create(&(tp->threads[i]), NULL, execute, (void *)tp) != 0) {
             printf("Failure: creating a thread failed.\n");
         }
    }

    return tp;
}

/**
 * inserts a task to the tasks queue of the thread pool.
 * @param threadPool thread pool
 * @param computeFunc task
 * @param param argument to the task
 * @return 0- success , -1 - fail
 */
int tpInsertTask(ThreadPool* threadPool, void (*computeFunc) (void *), void* param) {
    if(threadPool == NULL || computeFunc == NULL) {
        return FAILURE;
    }

    if (!(threadPool->canInsert)) {
        return FAILURE;
    }

    Task* task = (Task*)malloc(sizeof(Task));
    if (task == NULL) {
        printf("Failure: allocate memory for threads array");
        return FAILURE;
    }

    task->computeFunc = computeFunc;
    task->param = param;

    osEnqueue(threadPool->tasksQueue, (void *)task);

    pthread_mutex_lock(&(threadPool->queueLock));
    // wake up thread that wait as long as the tasks queue is empty
    if(pthread_cond_signal(&(threadPool->notify)) != 0) {
        printf("Failure: signal opertion in tpInsertTask\n");
    }
    pthread_mutex_unlock(&(threadPool->queueLock));
    return SUCCESS;
}

/**
 * Destroys the thread pool.
 * @param threadPool thread pool
 * @param shouldWaitForTasks 0 - dont wait for tasks in the queue, else - wait for tasks.
 */
void tpDestroy(ThreadPool* threadPool, int shouldWaitForTasks) {
    if (threadPool == NULL) {
        return;
    }

    pthread_mutex_lock(&destryLock);
    // first time enter to tpDestory with valid thread pool
    if ( threadPool->canInsert != 0) {
        threadPool->canInsert = 0;
        // make sure tpDestroy will ne called only once for thr thread pool
    } else {
        return;
    }
    pthread_mutex_unlock(&destryLock);


    if (shouldWaitForTasks == DONT_WAIT_FOR_TASKS) {
        threadPool->stopped = 1;
    }
    int i, err;

    pthread_mutex_lock(&(threadPool->queueLock));

    /* Wake up all worker threads */
    if((pthread_cond_broadcast(&(threadPool->notify)) != 0) ||
       (pthread_mutex_unlock(&(threadPool->queueLock)) != 0)) {
        printf("Exit due failure in tpDestory\n");
        exit(1);
    }

    for (i = 0; i < threadPool->numOfThreads; i++) {
        err = pthread_join(threadPool->threads[i], NULL);
        if (err != 0) {
            printf("Failure: waiting for thread no. %d\n", i);
        }
    }


    threadPool->stopped = 1;

    //free memory
    while (!osIsQueueEmpty(threadPool->tasksQueue)) {
        printf("Task was erased from tasks queue\n");
        Task* task = osDequeue(threadPool->tasksQueue);
        free(task);
    }

    osDestroyQueue(threadPool->tasksQueue);
    free(threadPool->threads);
    pthread_mutex_destroy(&(threadPool->lock));
    pthread_mutex_destroy(&(threadPool->queueLock));
    pthread_mutex_destroy(&destryLock);
    free(threadPool);
}

1 Ответ

0 голосов
/ 06 июня 2018

Из кода не совсем ясно, каково намерение мьютекса destryLock, тем более что он не инициализируется статическим инициализатором PTHREAD_MUTEX_INITIALIZER и не инициализируется pthread_mutex_init.Однако эта функция уничтожается в этой функции tpDestroy, поэтому любые вызовы pthread_mutex_lock, скорее всего, приводят к ошибке EINVAL.

Это, как говорится, основано на том, как выглядит tpDestroyкак будто должен сделать, то есть уничтожить объект пула потоков, созданный с помощью tpCreate, в этом коде неясно, каково было намерение для логики;следует отметить, что с этим может возникнуть условие взаимоблокировки:

pthread_mutex_lock(&destryLock);
// first time enter to tpDestory with valid thread pool
if ( threadPool->canInsert != 0) {
    threadPool->canInsert = 0;
    // make sure tpDestroy will ne called only once for thr thread pool
} else {
    return; // dead lock since not unlocking after having locked
}
pthread_mutex_unlock(&destryLock);

Это заставляет поверить, что этот код был построен (по крайней мере частично) кем-то, кто не полностью понимал многопоточность,или не совсем понял, как проект должен соответствовать пулу потоков.

Имеет смысл поместить мьютекс destryLock в саму структуру пула потоков, поскольку функция работает с объектом пула потоков.переданный, а не глобальный.

Я хотел бы знать правильное место для объявления мьютексов, которые используются так же, как они используются здесь.

Этот вопрос немного широк, учитывая ваше понимание примитивов многопоточности и синхронизации, вместо этого я сосредоточусь на , почему вы хотите мьютекс против , где вы хотите.

Мьютекс позволяет блокировать участки кода несколькими потоками, так что только один поток может одновременно получить доступ к коду.Вы делаете это, потому что в многоядерных системах вполне возможно, чтобы несколько потоков одновременно обращались к одним и тем же данным, что приводило к возникновению условий гонки и, следовательно, к неопределенному поведению.

Если вы хотите заблокироватькод из нескольких потоков, тогда , где может стать немного более понятным, так как вы сможете определить, должен ли мьютекс быть глобальным / локальным статическим объектом или он должен быть объектом-членом.

В качестве примера, скажем, у меня есть игра с кучей врагов;Скорее всего, я оставлю множество врагов в каком-то списке.Когда я хочу перебрать список врагов, скажем, для обнаружения столкновений, ИИ или других игровых эффектов, если в моей игре несколько потоков, действующих по списку врагов, я могу захотеть, чтобы мьютекс блокировался по всему списку, пока я выполняю преформунезависимо от логики игры на врагах, поэтому состояние врагов может быть точным для всех потоков.Это, однако, может быть не лучшим вариантом, так как может привести к задержке;вместо этого я мог бы захотеть создать мьютекс для каждого врага и заблокировать только врага, на которого воздействует логика.

Так что это больше о том, какие объекты с изменяемым состоянием вы хотите защитить.

Iнадеюсь, что это может помочь.

...