Как контролировать потоки с несколькими мьютексами и условиями? - PullRequest
0 голосов
/ 05 мая 2018

В приведенном ниже коде я написал программу для выполнения операций добавления / удаления массива int с использованием многопоточности. Условие состоит в том, что несколько потоков не могут выполнять операции в одной и той же ячейке, но параллельные операции могут выполняться в разных ячейках.

Я подумал, что для реализации таких условий мне нужно будет использовать несколько мьютексов и условных переменных, а именно столько ячеек в массиве. Начальное значение всех ячеек моего массива 10, а потоки увеличивают / уменьшают это значение на 3.

Кажется, что код ниже работает (значения ячеек массива после того, как все потоки закончили работать, как и ожидалось), но я не понимаю несколько вещей:

  1. Сначала я создаю потоки сумматора, которые спят на секунду. Кроме того, каждый поток имеет оператор printf, который запускается, если поток ожидает. Удаление потоков не спит, поэтому я ожидаю, что удаление потоков вызовет их операторы printf, потому что они должны подождать хотя бы секунду, прежде чем потоки сумматора завершат свою работу. Но потоки удаления никогда не вызывают printf.
  2. Мое второе беспокойство: как я уже упоминал, я сначала порождаю потоки сумматора, поэтому я ожидаю, что значение ячеек изменится с 10 до 13. Затем, если поток удаления получает блокировку, значение может перейти от 13 до 10 ИЛИ , если сумматор поток получает блокировку, тогда значение ячейки будет изменяться с 13 до 16. Но я не вижу поведения в printf инструкциях внутри потоков. Например, одна из printf последовательностей, которые у меня были: add thread id and cell id 1: cell value 10->13, затем remove thread id and cell id 1: cell value 10->7, затем add thread id and cell id 1: cell value 10->13. Это не имеет смысла. Я убедился, что все потоки указывают на один и тот же массив.

Итог Я хотел бы знать, является ли мое решение правильным, и если да, то почему происходит поведение, которое я описал. Если мое решение неверно, я бы оценил пример правильного решения или хотя бы общее направление.

Это код (вся логика в AdderThread, RemoveThread):

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#define ARR_LEN 5
#define THREADS_NUM 5
#define INIT_VAL 10
#define ADD_VAL 3
#define REMOVE_VAL 3
#define ADDER_LOOPS 2

typedef struct helper_t {
    int threadId;
    int * arr;
    int * stateArr; //0 if free, 1 if busy
} helper_t;

enum STATE {FREE, BUSY};
enum ERRORS {MUTEX, COND, CREATE, JOIN, LOCK, UNLOCK, WAIT, BROADCAST};

pthread_mutex_t mutexArr[THREADS_NUM];
pthread_cond_t condArr[THREADS_NUM];

void errorHandler(int errorId) {
    switch (errorId) {
        case MUTEX:
            printf("mutex error\n");
            break;
        case COND:
            printf("cond error\n");
            break;
        case CREATE:
            printf("create error\n");
            break;
        case JOIN:
            printf("join error\n");
            break;
        case LOCK:
            printf("lock error\n");
            break;
        case UNLOCK:
            printf("unlock error\n");
            break;
        case WAIT:
            printf("wait error\n");
            break;
        case BROADCAST:
            printf("broadcast error\n");
            break;
        default:
            printf("default switch\n");
            break;
    }
}

void mallocError() {
    printf("malloc error\nExiting app\n");
    exit(EXIT_FAILURE);
}

void initMutexesAndConds(pthread_mutex_t * mutexArr, pthread_cond_t * condArr) {
    int i;

    for(i = 0; i < THREADS_NUM; i++) {
        pthread_mutex_init(&mutexArr[i], NULL);
        pthread_cond_init(&condArr[i], NULL);
    }
}

helper_t * initStructs(int * arr, int * stateArr) {
    int i;
    helper_t * helpers = (helper_t *) malloc(sizeof(helper_t) * THREADS_NUM);
    if(!helpers) {
        mallocError();
    } else {
        for(i = 0; i < THREADS_NUM; i++) {
            helpers[i].threadId = i;
            helpers[i].arr = arr;
            helpers[i].stateArr = stateArr;
        }
    }
    return helpers;
}

void printArr(int * arr, int len) {
    int i;
    for(i = 0; i < len; i++) {
        printf("%d, ", arr[i]);
    }
    printf("\n");
}

void * AdderThread(void * arg) {
    int i;
    helper_t * h = (helper_t *) arg;
    int id = h->threadId;
    for(i = 0; i < ADDER_LOOPS; i++) {
        pthread_mutex_t * mutex = &mutexArr[id];
        pthread_cond_t * cond = &condArr[id];
        if(pthread_mutex_lock(mutex)) {
            errorHandler(LOCK);
        }
        while(h->stateArr[id] == BUSY) {
            printf("adder id %d waiting...\n", id);
            if(pthread_cond_wait(cond, mutex)) {
                errorHandler(WAIT);
            }
        }
        h->stateArr[id] = BUSY;
        sleep(1);
        h->arr[id] = h->arr[id] + ADD_VAL;
        printf("add thread id and cell id %d: cell value %d->%d\n", id, h->arr[id]-ADD_VAL, h->arr[id]);
        h->stateArr[id] = FREE;
        if(pthread_cond_broadcast(cond)) {
            errorHandler(BROADCAST);
        }
        if(pthread_mutex_unlock(mutex)) {
            errorHandler(UNLOCK);
        }
    }
    pthread_exit(NULL);
}

void * RemoveThread(void * arg) {
    helper_t * h = (helper_t *) arg;
    int id = h->threadId;
    pthread_mutex_t * mutex = &mutexArr[id];
    pthread_cond_t * cond = &condArr[id];
    if(pthread_mutex_lock(mutex)) {
        errorHandler(LOCK);
    }
    while(h->stateArr[id] == BUSY) {
        printf("remover id %d waiting...\n", id);
        if(pthread_cond_wait(cond, mutex)) {
            errorHandler(WAIT);
        }
    }
    h->stateArr[id] = BUSY;
    h->arr[id] = h->arr[id] - REMOVE_VAL;
    printf("remove thread id and cell id %d: cell value %d->%d\n", id, h->arr[id], h->arr[id]-ADD_VAL);
    h->stateArr[id] = FREE;
    if(pthread_cond_broadcast(cond)) {
        errorHandler(BROADCAST);
    }
    if(pthread_mutex_unlock(mutex)) {
        errorHandler(UNLOCK);
    }
    pthread_exit(NULL);
}

int main() {
    int i;
    helper_t * adderHelpers;
    helper_t * removeHelpers;
    pthread_t adders[THREADS_NUM];
    pthread_t removers[THREADS_NUM];
    int * arr = (int *) malloc(sizeof(int) * ARR_LEN);
    int * stateArr = (int *) malloc(sizeof(int) * ARR_LEN);
    if(!arr || !stateArr) {
        mallocError();
    }

    for(i = 0; i < ARR_LEN; i++) {
        arr[i] = INIT_VAL;
        stateArr[i] = FREE;
    }

    initMutexesAndConds(mutexArr, condArr);
    adderHelpers = initStructs(arr, stateArr);
    removeHelpers = initStructs(arr, stateArr);

    for(i = 0; i < THREADS_NUM; i++) {
        pthread_create(&adders[i], NULL, AdderThread, &adderHelpers[i]);
        pthread_create(&removers[i], NULL, RemoveThread, &removeHelpers[i]);
    }

    for(i = 0; i < THREADS_NUM; i++) {
        pthread_join(adders[i], NULL);
        pthread_join(removers[i], NULL);
    }

    printf("the results are:\n");
    printArr(arr, THREADS_NUM);
    printf("DONE.\n");

    return 0;
}

1 Ответ

0 голосов
/ 05 мая 2018

1) Эта кодовая последовательность в Addr:

   h->stateArr[id] = BUSY;
        sleep(1);
        h->arr[id] = h->arr[id] + ADD_VAL;
        printf("add thread id and cell id %d: cell value %d->%d\n", id, h->arr[id]-ADD_VAL, h->arr[id]);
        h->stateArr[id] = FREE;

Выполнено с заблокированным мьютексом; таким образом, Remove никогда не получит шанс увидеть состояние как что-либо, кроме FREE.

2) Нет гарантии, что владение мьютексом чередуется (afaik), но, по крайней мере, для правильной координации потоков вы никогда не должны полагаться на такие детали реализации. Это разница между работой и «случается с работой», которая обычно приводит к «привыкли работать» ...

Если вы поместите sleep () между мьютексной разблокировкой и мьютексной блокировкой, у вас может быть лучший случай, но на самом деле, он просто разблокирует его, а затем снова блокирует, так что система вполне может просто позволить это продолжает выполняться.

[Мне не хватило места в комментариях ...]:

Да, переменные условия здесь ничего не делают для вас. Идея условной переменной заключается в том, чтобы иметь возможность получать уведомления, когда по какому-либо общему возражению произошло значительное событие, такое как изменение состояния.

Например, водохранилище может иметь одну переменную условия для уровня воды. Мультиплексированы на это может быть много условий: уровень <1 м; уровень> 5м; уровень> 10м. Чтобы сохранить независимость системы (таким образом, работающую), бит, который обновляет уровень, может просто:

pthread_mutex_lock(&levellock);
level = x;
pthread_cond_broadcast(&newlevel);
pthread_mutex_unlock(&levellock);

Актеры, выполняющие условия, будут делать что-то вроде:

pthread_mutex_lock(&levellock);
while (1) {
    if (level is my conditions) {
         pthread_mutex_unlock(&levellock);
         alert the media
         pthread_mutex_lock(&levellock);
    }
    pthread_cond_wait(&newlevel, &levellock);
}

Таким образом, я могу добавить множество «мониторов состояния», не нарушая код установки уровня или систему в целом. Многое конечно, но освобождая мьютекс, пока я предупреждаю СМИ, я избегаю, чтобы моя система мониторинга воды полагалась на обработку тревоги.

Если вы знакомы с «публикацией / подпиской», вы можете найти это знакомым. Это принципиально та же модель, только PS скрывает кучу деталей.

...