Как я могу выполнить несколько потоков, чтобы объединить несколько раз в процесс? (Pthread_mutex_lock) - PullRequest
0 голосов
/ 16 января 2019

Я хочу написать программу, которая считается до 100. Я хочу сделать это с 10 потоками, используя блокировку pthread. Когда программа входит в поток, она генерирует число от 0 до 2, это значение будет добавлено к ее индексу массива, а также к глобальной переменной sum. Когда значение суммы достигает 100, каждый поток должен напечатать свое собственное значение массива (сумма всего массива должна быть равна переменной суммы).

Моя проблема заключается в следующем: всегда первый поток блокирует переменную mutex, но я хочу распределить задачу между всеми потоками (arr [1] = 100, каждый другой = 0, но я хочу, например, arr [1] = 14, обр. [2] = 8 и т. Д. До 100). Где я не прав?

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#include <errno.h>

pthread_mutex_t mutex;
int arr[10];
int sum = 0;

void *add (void* input){

    int *id = (int*) input, x, s;
    int ind = *id;
    while (sum < 100){
        while (1){
            if (pthread_mutex_trylock(&mutex) != EBUSY)
                break;
        }
        if (sum < 100){
            x = rand() % 3;
            arr[ind] = arr[ind] + x;
            sum += x;
        }
        pthread_mutex_unlock(&mutex);
        sleep(0.1);
    }
    printf("The %d. thread got %d points!\n", ind, arr[ind]);
    return NULL;
}

int main (void){

    int i;
    pthread_t threads[10];
    pthread_mutex_init(&mutex, NULL);
    srand(time(NULL));
    for (i = 0; i < 10; i++){
        if (pthread_create(&threads[i], NULL, add, &i)){
            perror("pthread_create");
            exit(1);
        }
    }
    for (i = 0; i < 10; i++){
        if (pthread_join (threads[i], NULL)){
            perror("pthread_join");
        }
    }
    pthread_mutex_destroy(&mutex);
    return 0;
}

Выход:

Поток 1. получил 100 баллов!

2. поток получил 0 баллов!

Тема 3. получила 0 баллов!

Тема 4. получила 0 баллов!

5. поток получил 0 баллов!

Тема 6. получила 0 баллов!

7. поток получил 0 баллов!

Тема 8. получила 0 баллов!

9. поток получил 0 баллов!

Поток 0. получил 0 баллов!

Ответы [ 2 ]

0 голосов
/ 16 января 2019

У вас есть несколько проблем в вашем коде, с различной степенью релевантности для конкретной проблемы, о которой вы спрашивали. Прежде чем мы пойдем дальше, один вывод I , полученный при запуске вашего кода, как написано, может быть иллюстративным:

The 2. thread got 100 points!
The 8. thread got 0 points!
The 9. thread got 0 points!
The 5. thread got 0 points!
The 5. thread got 0 points!
The 6. thread got 0 points!
The 7. thread got 0 points!
The 2. thread got 100 points!
The 4. thread got 0 points!
The 1. thread got 0 points!

Обратите внимание, что некоторые потоки получили одинаковый индекс счетчика. Это может произойти из-за гонки данных между основным потоком и каждым дочерним потоком: основной поток передает указатель на свою локальную переменную i, а затем переходит к изменению значения этой переменной. Между тем, дочерний поток читает значение переменной через указатель. Эти действия не синхронизированы, поэтому поведение не определено.

Эта конкретная проблема имеет несколько решений. Простейшим, вероятно, является приведение i к void * и передача результата (по значению). На самом деле это довольно часто, когда вы просто хотите передать целое число:

        if (pthread_create(&threads[i], NULL, add, (void *)i)) {
            // ...

Конечно, функция потока должна преобразовать ее обратно:

void *add (void* input) {
    int /*no need for 'id' */ x, s;
    int ind = (int) input;
    // ...

Далее, обратите внимание, что у вас есть другая гонка данных среди всех ваших дочерних потоков, где вы читаете значение sum в состоянии цикла while. Тот, кажется, не кусает вас в данный момент, но это может сделать в любое время. Поскольку потоки, как группа, читают и изменяют sum, вы должны обеспечить синхронизацию всех таких обращений, например, выполняя их только при удержании мьютекса заблокированным.

Пропустив немного (мы вернемся), у вас проблема с вашим sleep() звонком. Параметр этой функции имеет тип int, поэтому ваш фактический аргумент, double 0.1, преобразуется в int, что дает 0. Компилятор может оптимизировать это вообще, а если нет, то он может эффективно быть неоператором. Еще важнее , однако. sleep() - просто неправильный инструмент для этой работы или почти для любой работы, связанной с синхронизацией между потоками и синхронизацией.

Признавая теперь, что спящего нет, вы должны увидеть, что ваш внешний цикл while довольно плотный, и поток, который только что разблокировал мьютекс, немедленно попытается снова его заблокировать. Такая попытка повторной блокировки весьма распространена с первой попытки, поскольку поведение мьютекса по умолчанию не дает никаких обещаний относительно справедливости планирования потоков.

Кроме того, вы не получите особой выгоды от своеобразной петли занятости вокруг pthread_mutex_trylock(). Просто используйте pthread_mutex_lock(), если в противном случае все, что вы собираетесь сделать, когда мьютекс занят, это повторить попытку. Это сэкономит ваше процессорное время и не будет существенно отличаться от семантики.

В целом, у вас есть проблема со стратегией. Мьютексы, как правило, не дают никаких гарантий относительно справедливого планирования. Обычно для тесного многопоточного взаимодействия, такого как это, вам нужно выполнить какое-то управление расписанием вручную, и это, как правило, приносит большую пользу от добавления условной переменной в смесь.

Похоже, что вы не хотите заставлять потоки по очереди, но, возможно, этого будет достаточно, чтобы гарантировать, что только что запущенный поток не сразу снова будет выбран. В этом случае вы можете добавить общую переменную, которая записывает индекс только что запущенного потока, и использовать его, чтобы предотвратить повторный выбор в качестве следующего потока. Схематически функция потока может выглядеть примерно так:

  • цикл до бесконечности:
    • заблокировать мьютекс
    • цикл до бесконечности:
      • сравнить индекс последнего потока с моим индексом
      • если он другой, оторваться от (внутреннего) цикла
      • еще подождите условную переменную
    • установить мой индекс как индекс последней нити
    • (по-прежнему удерживая мьютекс заблокированным) транслировать или подавать сигнал CV
    • если sum меньше 100
      • обновление sum
      • разблокировать мьютекс
    • в противном случае (sum is> = 100)
      • разблокировка мьютекса
      • разрыв от (внешней) петли
0 голосов
/ 16 января 2019

Я нашел две проблемы.

Когда я попробовал это на Windows с MSys2, мой вывод был

The 1. thread got 101 points!
The 1. thread got 101 points!
The 2. thread got 0 points!
The 3. thread got 0 points!
The 5. thread got 0 points!
The 6. thread got 0 points!
The 6. thread got 0 points!
The 8. thread got 0 points!
The 9. thread got 0 points!
The 0. thread got 0 points!

Вы передаете адрес вашего счетчика циклов i в потоки. Эта переменная не является константой, поэтому потоки могут прочитать значение, которое уже изменилось. Чтобы исправить это, я создал массив int ids[10] и передал каждому потоку свой собственный элемент массива идентификаторов.

Функция sleep использует аргумент unsigned int, поэтому вы эффективно вызываете sleep(0). С sleep(1) программа работает медленнее, но работает как положено. Вы можете использовать usleep, если вы хотите спать в течение доли секунды.

Модифицированная программа:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <time.h>
#include <errno.h>

pthread_mutex_t mutex;
int arr[10];
int sum = 0;

void *add (void* input){

    int *id = (int*) input, x, s;
    int ind = *id;
    while (sum < 100){
        while (1){
            if (pthread_mutex_trylock(&mutex) != EBUSY)
                break;
        }
        if (sum < 100){
            x = rand() % 3;
            arr[ind] = arr[ind] + x;
            sum += x;
        }
        pthread_mutex_unlock(&mutex);
        usleep(100000);
    }
    printf("The %d. thread got %d points!\n", ind, arr[ind]);
    return NULL;
}

int main (void){

    int i;
    pthread_t threads[10];
    int ids[10];
    pthread_mutex_init(&mutex, NULL);
    srand(time(NULL));
    for (i = 0; i < 10; i++){
        ids[i] = i;
        if (pthread_create(&threads[i], NULL, add, &ids[i])){
            perror("pthread_create");
            exit(1);
        }
    }
    for (i = 0; i < 10; i++){
        if (pthread_join (threads[i], NULL)){
            perror("pthread_join");
        }
    }
    pthread_mutex_destroy(&mutex);
    return 0;
}
...