Приостановка потоков пула потоков в C - PullRequest
0 голосов
/ 05 мая 2020

Я работаю над проектом, основанным на параллельном программировании, где мне нужно выполнить поставленную задачу максимально эффективно (с точки зрения затрат времени и энергии). Для этого мне нужно приостановить некоторые потоки из рабочего пула в зависимости от некоторых условий. Эти потоки создаются с помощью pthread_create ().

У меня есть два типа рабочих пулов: один хранит активные потоки, а другой - приостановленные потоки. После определения потока, который должен быть приостановлен, я помещаю sh его идентификатор потока в свой пул приостановленных потоков, а затем приостанавливаю поток с помощью pthread_kill.

push_task_suspended(threadID);
int status = pthread_kill(threadID,SIGSTOP);

Но при использовании этого я получаю ошибку сегментации. Я запустил gdb с этим кодом, он показывает ошибку сегментации из-за pthread_kill.

Подскажите, пожалуйста, почему я получаю это?

1 Ответ

2 голосов
/ 05 мая 2020

Я не знаю, почему происходит сбой pthread_kill(threadID,SIGSTOP) - думаю, threadID не pthread_t для потока? - но это определенно не лучший способ решения проблемы!

Переменные условия немного сложны, но их стоит понимать. Я немного взволновался ... но надеюсь, что это полезно.

Использование вашей собственной очереди 'task_suspended' - с sema_t

Предположим, у вас есть мьютекс для удаления из очереди ожидающих задач и постановки в очередь простаивающих рабочих. Тогда простаивающий работник должен:

  loop:
    lock(mutex)
      .... look for task, but if none pending ....
      enqueue(self)  -- on task_suspended queue
    unlock(mutex)    -- (a)
    suspend(self)    -- (b)
    goto loop

А при добавлении задачи лог c будет:

    lock(mutex)
      enqueue(task)  -- on task pending queue
      if (worker-idle-queue-not-empty)
        dequeue(worker)
        desuspend(worker)
    unlock(mutex)

Фактически, desuspend() не обязательно внутри mutex, но это мелочь.

Важно то, что desuspend() должен работать, даже если это происходит между unlock() at (a) и suspend() at (b ). Вы можете дать каждому потоку свой собственный семафор sem_t - тогда suspend() - это sem_wait(), а desuspend() - sem_post(). [Но нет, вы не можете использовать для этого мьютекс !!]

Использование «переменной условия»

С вашей собственной очередью «task_suspended» вы изобретаете колесо.

Как упоминалось в комментариях выше, инструмент для этого задания предусмотрена (так называемая) «условная переменная» - pthread_cond_t.

Ключ к использованию «условных переменных» - это понимать, что они абсолютно не переменные - они не имеют значение, они не в любом смысле подсчитывают количество pthread_cond_wait() и / или pthread_cond_signal() ... они не форма семафора. Несмотря на название, pthread_cond_t лучше всего рассматривать как очередь потоков. А затем:

    pthread_cond_wait(cond, mutex)  is, effectively:  enqueue(self)   -- on 'cond'
                                                      unlock(mutex)
                                                      suspend(self)
                                                      ....wait for signal...
                                                      lock(mutex)

где по мнению некоторых магов c enqueue() + unlock() + suspend() - это одна операция (для всех потоков), а затем:

    pthread_cond_signal(cond)  is, effectively:      if ('cond' queue-not-empty)
                                                       dequeue(thread)  -- from 'cond'
                                                       desuspend(thread)

где, также, каким-то магом c, это всего лишь одна операция. [NB: pthread_cond_signal() разрешено выводить из очереди и приостанавливать более одного потока, см. Ниже.]

Итак, теперь для рабочего потока у нас есть:

    lock(mutex)
  loop:
      .... look for task, but if none pending ....
      pthread_cond_wait(cond, mutex)     
      goto loop
    ... if have task, pick it up ...
    unlock(mutex)

и для создания задачи :

    lock(mutex)
      enqueue(task)    
      pthread_cond_signal(cond)
    unlock(mutex)

где cond занимает место явной очереди ожидающих потоков.

Теперь pthread_cond_signal(cond) может находиться внутри или вне mutex. Если внутри, то концептуально, как только поток будет исключен из очереди cond, он запустится и немедленно заблокируется на мьютексе, что кажется пустой тратой. Но реализация могла бы сделать что-нибудь умное и просто передать перезапущенный поток (потоки) из одной очереди в другую.

Обратите внимание, что создатель задачи не знает, сколько существует приостановленных потоков, и ему все равно. POSIX сообщает, что функция pthread_cond_signal () должна:

  • ... разблокировать хотя бы один из потоков, которые заблокированы указанной переменной условия cond ( если какие-либо потоки заблокированы на cond ).

  • ... не действуют, если в настоящее время нет потоков, заблокированных на cond .

Обратите особое внимание на «разблокировать хотя бы один из потоков». Опять же, ошибочно думать о cond как о переменной. Было бы ошибкой думать о cond как о (скажем) флаге «задача готова», или счетчике, или о чем-то еще, что вы можете считать переменной. Это не так. Когда поток перезапускается после pthread_cond_wait(), то, чего он ожидал, могло произойти, а могло и не произойти, и если да, то другой поток мог попасть туда первым. Вот почему все, что вы читаете о (так называемых) «переменных состояния», будет говорить об их использовании внутри al oop и возврате в начало l oop (сразу после lock(mutex)) при возврате из pthread_cond_wait().

NB: когда поток перезапускается после pthread_cond_wait(), он может быть одним из нескольких, перезапускаемых одним pthread_cond_signal(), и да, кажется странным, что POSIX позволяет это - предположительно либо для соответствия какая-то историческая реализация c, или для упрощения реализации (возможно, связанной с приоритетом потока). Но даже если pthread_cond_wait() действительно гарантировал перезапуск только одного потока, перезапущенный поток мог бы восстановить мьютекс после какого-либо другого рабочего потока, таким образом:

     Worker 1               |  Worker 2               |  Task Queue
       busy                 |    busy                 |    empty
       lock(mutex)          |    .                    |    .
       + task queue empty   |    .                    |    lock(mutex)
       unlock(mutex) +      |    .                    |    -
                 wait(cond) |    .                    |    -
       ~                    |    lock(mutex)          |    + enqueue task
       ~                    |    -                    |    + signal(cond)
       re-lock(mutex)       |    -                    |    unlock(mutex)                      
       -                    |    + dequeue task       |    .
       -                    |    unlock(mutex)        |    empty
       + task queue empty ! |    busy                 |    .     

где + - где поток владеет мьютексом, - - это место, где он ожидает мьютекса, а ~ - это место, где он ожидает сигнала «cond».

Вы можете беспокоиться о том, что делать 1123 * каждый раз, когда новая задача ставится в очередь ... так что вы могли делать это только тогда, когда очередь задач была пустой. Вы должны суметь убедить себя, что это работает, особенно если это делается внутри мьютекса.

Использование sem_t или sem_t со счетчиком

В качестве альтернативы вы можете использовать sem_t для подсчета количества «задач - официанты». Каждый раз, когда новая задача добавляется в очередь, семафор увеличивается (sem_post). Каждый раз, когда работник завершает задачу, он резервирует следующую задачу или ждет (sem_wait). Вам по-прежнему нужен безопасный способ ставить и выводить задачи из очереди - скажем: блокировка (мьютекс), постановка в очередь (задача), разблокировка (мьютекс), публикация (сем); и: wait (sem), lock (mutex), dequeue (task), unlock (mutex).

Единственная трудность здесь в том, что максимальное значение семафора может быть всего 32767 - см. sysconf(_SC_SEM_VALUE_MAX).

Или вы можете использовать один sem_t и количество официантов. Итак, для рабочего потока у нас есть:

  loop:
    lock(mutex)
      .... look for task, but if none pending ....
      increment waiter-count
    unlock(mutex)
    sem_wait(sem)
    goto loop

, а для создания задачи:

    lock(mutex)
      enqueue(task)    
      kick = (waiter-count != 0)
      if (kick)
        decrement(waiter-count)
    unlock(mutex)
    if (kick)
      sem_post(sem)

sem_post() можно поместить внутри мьютекса, но лучше снаружи.

И у вас все в порядке, если у вас не более 32767 рабочих потоков (!).

Но, когда вы отмените выбор, вы увидите, что это (в основном) новое изобретение pthread_cond_wait / _signal(), и вряд ли будет более эффективным.

...