Понимание блокировок pthreads и переменных условий - PullRequest
1 голос
/ 26 мая 2020

У меня было упражнение о потоках, блокировках и условных переменных в C. Мне нужно было написать программу, которая получает данные, превращает их в связанный список, запускает 3 потока, каждый из которых вычисляет результат для каждого узла в списке, и основной поток выводит результаты после завершения evreyone.

Это основная функция :

int thread_finished_count;

// Lock and Conditional variable
pthread_mutex_t list_lock;
pthread_mutex_t thread_lock;
pthread_cond_t thread_cv;

int main(int argc, char const *argv[])
{
    node *list;
    int pairs_count, status;
    thread_finished_count = 0;

    /* get the data and start the threads */
    node *head = create_numbers(argc, argv, &pairs_count);
    list = head; // backup head for results
    pthread_t *threads = start_threads(&list);

    /* wait for threads and destroy lock */
    status = pthread_cond_wait(&thread_cv, &list_lock);
    chcek_status(status);
    status = pthread_mutex_destroy(&list_lock);
    chcek_status(status);
    status = pthread_mutex_destroy(&thread_lock);
    chcek_status(status);

    /* print result in original list */
    print_results(head);

    /* cleanup */
    wait_for_threads(threads, NUM_THREADS);
    free_list(head);
    free(threads);

    return EXIT_SUCCESS;
}

Обратите внимание, что функция create_numbers работает правильно, и список работает должным образом.

Вот код start_thread и thread_function:

pthread_t *start_threads(node **list)
{
    int status;
    pthread_t *threads = (pthread_t *)malloc(sizeof(pthread_t) * NUM_THREADS);
    check_malloc(threads);

    for (int i = 0; i < NUM_THREADS; i++)
    {
        status = pthread_create(&threads[i], NULL, thread_function, list);
        chcek_status(status);
    }
    return threads;
}

void *thread_function(node **list)
{
    int status, self_id = pthread_self();
    printf("im in %u\n", self_id);
    node *currentNode;

    while (1)
    {
        if (!(*list))
            break;
        status = pthread_mutex_lock(&list_lock);
        chcek_status(status);
        printf("list location %p thread %u\n", *list, self_id);
        if (!(*list))
        {
            status = pthread_mutex_unlock(&list_lock);
            chcek_status(status);
            break;
        }
        currentNode = (*list);
        (*list) = (*list)->next;
        status = pthread_mutex_unlock(&list_lock);
        chcek_status(status);
        currentNode->gcd = gcd(currentNode->num1, currentNode->num2);
        status = usleep(10);
        chcek_status(status);
    }
    status = pthread_mutex_lock(&thread_lock);
    chcek_status(status);
    thread_finished_count++;
    status = pthread_mutex_unlock(&thread_lock);
    chcek_status(status);
    if (thread_finished_count != 3)
        return NULL;
    status = pthread_cond_signal(&thread_cv);
    chcek_status(status);
    return NULL;
}
void chcek_status(int status)
{
    if (status != 0)
    {
        fputs("pthread_function() error\n", stderr);
        exit(EXIT_FAILURE);
    }
}

Обратите внимание, что self_id используется в целях отладки.

Моя проблема

  1. Моя основная проблема связана с разделением работы. Таким образом, каждый поток получает элемент из глобального связанного списка, вычисляет gcd, затем go и принимает следующий элемент. Я получаю этот эффект, только если добавляю usleep (10) после того, как разблокирую мьютекс в while l oop. Если я не добавлю usleep, ПЕРВЫЙ поток будет go и выполнит всю работу, в то время как другой поток просто ждет и войдет после того, как вся работа будет сделана.

Обратите внимание! : Я подумал о том, что, возможно, создается первый поток, и пока второй поток не будет создан, первый уже завершит sh все задания. Вот почему я добавил проверку «Я нахожусь в #threadID» с помощью usleep (10) при создании потока evrey. Они все входят, но только первый выполняет всю работу. Вот пример вывода, если я сплю после разблокировки мьютекса (обратите внимание на другой идентификатор потока)

с usleep

./v2 nums.txt
im in 1333593856
list location 0x7fffc4fb56a0 thread 1333593856
im in 1316685568
im in 1325139712
list location 0x7fffc4fb56c0 thread 1333593856
list location 0x7fffc4fb56e0 thread 1316685568
list location 0x7fffc4fb5700 thread 1325139712
list location 0x7fffc4fb5720 thread 1333593856
list location 0x7fffc4fb5740 thread 1316685568
list location 0x7fffc4fb5760 thread 1325139712
list location 0x7fffc4fb5780 thread 1333593856
list location 0x7fffc4fb57a0 thread 1316685568
list location 0x7fffc4fb57c0 thread 1325139712
list location 0x7fffc4fb57e0 thread 1333593856
list location 0x7fffc4fb5800 thread 1316685568
list location (nil) thread 1325139712
list location (nil) thread 1333593856
...
normal result output
...

И вот результат, если я закомментирую usleep после блокировки мьютекса (обратите внимание на тот же идентификатор потока) без usleep

  ./v2 nums.txt
im in 2631730944
list location 0x7fffe5b946a0 thread 2631730944
list location 0x7fffe5b946c0 thread 2631730944
list location 0x7fffe5b946e0 thread 2631730944
list location 0x7fffe5b94700 thread 2631730944
list location 0x7fffe5b94720 thread 2631730944
list location 0x7fffe5b94740 thread 2631730944
list location 0x7fffe5b94760 thread 2631730944
list location 0x7fffe5b94780 thread 2631730944
list location 0x7fffe5b947a0 thread 2631730944
list location 0x7fffe5b947c0 thread 2631730944
list location 0x7fffe5b947e0 thread 2631730944
list location 0x7fffe5b94800 thread 2631730944
im in 2623276800
im in 2614822656
...
normal result output
...
Второй вопрос касается порядка работы потоков. В моем упражнении я просил меня не использовать соединение для синхронизации потоков (использовать только в конце для «свободных ресурсов»), а использовать эту условную переменную.

Моя цель состоит в том, чтобы каждый поток принимал элемент, сделайте расчет, и тем временем другой поток будет go и возьмет другой элемент, и новый поток возьмет каждый элемент (или, по крайней мере, близок к этому)

Спасибо за чтение, и я приветствую вашу помощь.

Ответы [ 2 ]

1 голос
/ 26 мая 2020

Во-первых, вы выполняете работу gcd(), удерживая блокировку ... так что (a) только один поток будет выполнять любую работу одновременно, хотя (b) , который не полностью объясняет, почему только один поток выполняет (почти) всю работу - как говорит KamilCuk, может быть так мало работы, что она (почти) все выполняется до того, как второй поток проснется правильно. [Еще exoti c, может быть некоторая задержка между потоком «a», разблокирующим мьютекс, и запуском другого потока, так что поток «a» может получить мьютекс до того, как туда попадет другой поток.]

POSIX говорит, что когда мьютекс разблокирован, если есть ожидающие, тогда «политика планирования должна определять, какой поток получит мьютекс». «Политика планирования» по умолчанию (насколько мне известно) определена в реализации.

Вы можете попробовать несколько вещей: (1) используйте pthread_barrier_t для хранения всех потоки в начале thread_function(), пока все они не будут запущены; (2) используйте sched_yield(void) после pthread_mutex_unlock(), чтобы побудить систему запустить новый запускаемый поток.

Во-вторых, вы ни при каких обстоятельствах не должны рассматривать «условную переменную» как сигнал. Чтобы main() знал, что все потоки завершены, вам нужен счетчик, который может быть pthread_barrier_t; или это может быть простое целое число, защищенное мьютексом, с «условной переменной» для удержания основного потока, пока он ожидает; или это может быть счетчик (в main()) и семафор (отправляется один раз каждым потоком при выходе).

В-третьих, вы показываете pthread_cond_wait(&cv, &lock); в main(). В этот момент main() должен владеть lock ... и важно, когда это произошло. Но: в существующем виде, поток первый , который обнаружит list пустой, запустит cv, а main() продолжит работу, даже если другие потоки все еще работают. Хотя однажды main() действительно повторно получает lock, все потоки, которые все еще работают, будут либо завершены, либо застрянут на lock. (Это беспорядок.)


В общем, шаблон для использования «условной переменной»:

    pthread_mutex_lock(&...lock) ;

    while (!(... thing we need ...))
      pthread_cond_wait(&...cond_var, &...lock) ;

    ... do stuff now we have what we need ....

    pthread_mutex_unlock(&...lock) ;

NB: «условная переменная» не имеет значения ... несмотря на название, это не флаг, указывающий, что какое-то условие истинно. «Условная переменная» - это, по сути, очередь потоков, ожидающих перезапуска. Когда сигнализируется «условная переменная», по крайней мере один ожидающий поток будет перезапущен, но если нет ожидающих потоков, ничего не происходит, в частности (так называемая) «условная переменная» сохраняет нет памяти сигнала.

В новом коде, следуя приведенному выше шаблону, main() должно:

    /* wait for threads .... */

    status = pthread_mutex_lock(&thread_lock);
    chcek_status(status);

    while (thread_finished_count != 3)
      {
        pthread_cond_wait(&thread_cv, &thread_lock) ;
        chcek_status(status);
      } ;

    status = pthread_mutex_unlock(&thread_lock) ;
    chcek_status(status);

Итак, что здесь происходит?

  1. main() ожидает thread_finished_count == 3

  2. thread_finished_count - это общая переменная, "защищенная" мьютексом thread_lock.

    ... поэтому он увеличивается на thread_function() под мьютексом.

    ... и main() также должны читать его под мьютексом.

  3. если main() находит thread_finished_count != 3, он должен подождать.

    для этого он выполняет: pthread_cond_wait(&thread_cv, &thread_lock), что:

    • разблокирует thread_lock

    • помещает поток в очередь thread_cv ожидающих потоков.

    и выполняет эти атомарно .

  4. когда thread_function() выполняет pthread_cond_signal(&thread_cv), он пробуждает ожидающий поток.

  5. когда поток main() просыпается, он сначала повторно получает thread_lock ...

    ... так что он может затем перейдите к перечитыванию thread_finished_count, чтобы увидеть, теперь ли это 3.

FWIW: я рекомендую не уничтожать мьютексы и т. Д. c до после все потоки будут объединены.

0 голосов
/ 03 июня 2020

Я глубже изучил, как glib c (v2.30 на Linux и x86_64, по крайней мере) реализует pthread_mutex_lock() и _unlock().

Оказывается, что _lock() работает примерно так:

  if (atomic_cmp_xchg(mutex->lock, 0, 1))
    return <OK> ;             // mutex->lock was 0, is now 1

  while (1)
    {
      if (atomic_xchg(mutex->lock, 2) == 0)
        return <OK> ;        // mutex->lock was 0, is now 2

      ...do FUTEX_WAIT(2)... // suspend thread iff mutex->lock == 2...
    } ;

И _unlock() работает примерно так:

  if (atomic_xchg(mutex->lock, 0) == 2)  // set mutex->lock == 0
    ...do FUTEX_WAKE(1)...               // if may have waiter(s) start 1

Теперь:

  • mutex->lock: 0 => разблокировано, 1 => заблокировано, но нет официантов, 2 => заблокировано с официантами

    'заблокировано, но нет официантов' оптимизируется для случая, когда нет блокировки разногласия и нет необходимости выполнять FUTEX_WAKE в _unlock().

  • функции _lock() / _unlock() находятся в библиотеке - они не в ядре.

    ... в частности, право собственности на мьютекс - это вопрос библиотеки, не ядра.

  • FUTEX_WAIT(2) - это вызов ядра, которое помещает поток в очередь ожидания, связанную с мьютексом, если только mutex->lock != 2.

    Ядро не проверяет наличие mutex->lock == 2 и добавляет поток в очередь атомарно . Это касается случая, когда _unlock() вызывается после atomic_xchg(mutex->lock, 2).

  • FUTEX_WAKE(1) также является вызовом ядра, а страница руководства futex сообщает нам:

    FUTEX_WAKE (начиная с Linux 2.6. 0)

    Эта операция будит не более 'val' ожидающих официантов ... Не дается никаких гарантий относительно того, какие из них разбужены (например, официант с более высоким приоритетом планирования) не гарантируется, что он будет разбужен вместо официанта с более низким приоритетом).

    где 'val' в данном случае равно 1.

    Хотя в документации указано «нет гарантии, какие официанты разбужены», очередь выглядит как минимум FIFO.

Обратите внимание, что:

  1. _unlock() не не передает мьютекс потоку, запущенному FUTEX_WAKE.

  2. после пробуждения поток снова попытается получить блокировку. .

    ... но может быть побежден любым другим запущенным потоком, включая поток, который только что выполнил _unlock().

* 108 9 * Я считаю, что именно поэтому вы не видели, чтобы работа распределялась по потокам. Для каждого из них так мало работы, что поток может разблокировать мьютекс, выполнить работу и вернуться, чтобы снова заблокировать мьютекс до поток, разбуженный разблокировкой, может начать работу и преуспеть в блокировка мьютекса.
...