Пробуждение нескольких потоков для работы один раз для каждого условия - PullRequest
0 голосов
/ 27 мая 2020

У меня есть ситуация, когда одному потоку нужно время от времени разбудить несколько рабочих потоков, и каждый рабочий поток должен выполнить свою работу (только) один раз, а затем go снова засыпать, чтобы дождаться следующего уведомления. Я использую condition_variable, чтобы все разбудить, но проблема, с которой я столкнулся, - это "только один раз". Предположим, что создать каждый поток сложно, поэтому я не хочу просто создавать и присоединять их каждый раз.

// g++ -Wall -o threadtest -pthread threadtest.cpp
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <chrono>

std::mutex condMutex;
std::condition_variable condVar;
bool dataReady = false;

void state_change_worker(int id)
{
    while (1)
    {
        {
            std::unique_lock<std::mutex> lck(condMutex);
            condVar.wait(lck, [] { return dataReady; });
            // Do work only once.
            std::cout << "thread " << id << " working\n";
        }
    }
}

int main()
{
    // Create some worker threads.
    std::thread threads[5];
    for (int i = 0; i < 5; ++i)
        threads[i] = std::thread(state_change_worker, i);

    while (1)
    {
        // Signal to the worker threads to work.
        {
            std::cout << "Notifying threads.\n";
            std::unique_lock<std::mutex> lck(condMutex);
            dataReady = true;
            condVar.notify_all();
        }
        // It would be really great if I could wait() on all of the 
        // worker threads being done with their work here, but it's 
        // not strictly necessary.
        std::cout << "Sleep for a bit.\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
}

Обновление: вот версия, реализующая почти, но не совсем работающую версия отряда блокировки. Проблема в том, что я не могу гарантировать, что у каждого потока будет возможность проснуться и подсчитать количество в waitForLeader (), прежде чем он снова запустится.

// g++ -Wall -o threadtest -pthread threadtest.cpp
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <chrono>

class SquadLock
{
public:
    void waitForLeader()
    {
        {
            // Increment count to show that we are waiting in queue.
            // Also, if we are the thread that reached the target, signal
            // to the leader that everything is ready.
            std::unique_lock<std::mutex> count_lock(count_mutex_);
            std::unique_lock<std::mutex> target_lock(target_mutex_);
            if (++count_ >= target_)
                count_cond_.notify_one();
        }
        // Wait for leader to signal done.
        std::unique_lock<std::mutex> lck(done_mutex_);
        done_cond_.wait(lck, [&] { return done_; });
        {
            // Decrement count to show that we are no longer waiting.
            // If we are the last thread set done to false.
            std::unique_lock<std::mutex> lck(count_mutex_);
            if (--count_ == 0)
            {
                done_ = false;
            }
        }
    }

    void waitForHerd()
    {
        std::unique_lock<std::mutex> lck(count_mutex_);
        count_cond_.wait(lck, [&] { return count_ >= target_; });
    }
    void leaderDone()
    {
        std::unique_lock<std::mutex> lck(done_mutex_);
        done_ = true;
        done_cond_.notify_all();
    }
    void incrementTarget()
    {
        std::unique_lock<std::mutex> lck(target_mutex_);
        ++target_;
    }
    void decrementTarget()
    {
        std::unique_lock<std::mutex> lck(target_mutex_);
        --target_;
    }
    void setTarget(int target)
    {
        std::unique_lock<std::mutex> lck(target_mutex_);
        target_ = target;
    }

private:
    // Condition variable to indicate that the leader is done.
    std::mutex done_mutex_;
    std::condition_variable done_cond_;
    bool done_ = false;

    // Count of currently waiting tasks.
    std::mutex count_mutex_;
    std::condition_variable count_cond_;
    int count_ = 0;

    // Target number of tasks ready for the leader.
    std::mutex target_mutex_;
    int target_ = 0;
};

SquadLock squad_lock;
std::mutex print_mutex;
void state_change_worker(int id)
{
    while (1)
    {
        // Wait for the leader to signal that we are ready to work.
        squad_lock.waitForLeader();
        {
            // Adding just a bit of sleep here makes it so that every thread wakes up, but that isn't the right way.
            // std::this_thread::sleep_for(std::chrono::milliseconds(100));
            std::unique_lock<std::mutex> lck(print_mutex);
            std::cout << "thread " << id << " working\n";
        }
    }
}

int main()
{

    // Create some worker threads and increment target for each one
    // since we want to wait until all threads are finished.
    std::thread threads[5];
    for (int i = 0; i < 5; ++i)
    {
        squad_lock.incrementTarget();
        threads[i] = std::thread(state_change_worker, i);
    }
    while (1)
    {
        // Signal to the worker threads to work.
        std::cout << "Starting threads.\n";
        squad_lock.leaderDone();
        // Wait for the worked threads to be done.
        squad_lock.waitForHerd();
        // Wait until next time, processing results.
        std::cout << "Tasks done, waiting for next time.\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
}

1 Ответ

1 голос
/ 27 мая 2020

Ниже приводится отрывок из моего блога, посвященного шаблонам параллельного проектирования. Шаблоны выражаются с использованием языка Ada, но концепции могут быть переведены на C ++.

Резюме

Многие приложения состоят из групп взаимодействующих потоков выполнения. Исторически это часто достигалось путем создания группы взаимодействующих процессов. Эти процессы будут сотрудничать путем обмена данными. Сначала для обмена данными использовались только файлы. Совместное использование файлов представляет некоторые интересные проблемы. Если один процесс выполняет запись в файл, а другой процесс читает из файла, вы часто будете сталкиваться с повреждением данных, потому что процесс чтения может попытаться прочитать данные до того, как процесс записи полностью запишет информацию. Для этого использовалось решение, заключающееся в создании блокировок файлов, чтобы только один процесс мог открыть файл за раз. Unix представил концепцию канала, который фактически представляет собой очередь данных. Один процесс может писать в канал, а другой читает из канала. Операционная система обрабатывает данные в конвейере как серию байтов. Это не позволяет процессу чтения получить доступ к определенному байту данных, пока процесс записи не завершит свою операцию с данными. Различные операционные системы также представили другие механизмы, позволяющие процессам обмениваться данными. Примеры включают очереди сообщений, сокеты и разделяемую память. Существовали также специальные функции, помогающие программистам контролировать доступ к данным, таким как семафоры. Когда операционные системы представили возможность для одного процесса управлять несколькими потоками выполнения, также известными как легкие потоки или просто потоки, они также должны были предоставить соответствующие механизмы блокировки для общих данных. Опыт показывает, что, хотя разнообразие возможных схем для общих данных довольно велико, существует несколько очень распространенных шаблонов проектирования, которые часто возникают. В частности, существует несколько вариантов блокировки или семафора, а также несколько вариантов буферизации данных. В этой статье исследуются шаблоны проектирования блокировки и буферизации для потоков в контексте монитора. Хотя мониторы могут быть реализованы на многих языках, все примеры в этой статье представлены с использованием защищенных типов Ada. Типы, защищенные в Ada, представляют собой очень тщательную реализацию монитора.

Мониторы

Существует несколько теоретических подходов к созданию и управлению разделяемой памятью. Один из самых гибких и надежных - это монитор, впервые описанный C .AR Hoare. Монитор - это объект данных с тремя различными типами операций.

Процедуры используются для изменения состояния или значений, содержащихся в мониторе. Когда поток вызывает процедуру монитора, этот поток должен иметь монопольный доступ к монитору, чтобы предотвратить обнаружение другими потоками поврежденных или частично записанных данных.

Записи, как и процедуры, используются для изменения состояния или значений, содержащихся в monitor, но запись также определяет граничное условие. Запись может быть выполнена только при выполнении граничного условия. Потоки, которые вызывают запись, когда граничное условие ложно, помещаются в очередь до тех пор, пока граничное условие не станет истинным. Записи используются, например, чтобы разрешить потоку читать из общего буфера. Читающему потоку не разрешается читать данные до тех пор, пока буфер действительно не будет содержать некоторые данные. Граничным условием будет то, что буфер не должен быть пустым. Записи, как и процедуры, должны иметь монопольный доступ к данным монитора.

Функции используются для сообщения о состоянии монитора. Поскольку функции только сообщают о состоянии и не изменяют его, им не требуется монопольный доступ к данным монитора. Многие потоки могут одновременно обращаться к одному и тому же монитору с помощью функций без опасности повреждения данных.

Концепция монитора чрезвычайно эффективна. Это также может быть чрезвычайно эффективным. Мониторы предоставляют все возможности, необходимые для разработки эффективных и надежных общих структур данных для многопоточных систем. Хотя мониторы мощные, у них есть некоторые ограничения. Операции, выполняемые на мониторе, должны быть очень быстрыми, без возможности создания блока потока. Если эти операции будут заблокированы, монитор станет преградой, а не средством коммуникации. Все потоки, ожидающие доступа к монитору, будут заблокированы до тех пор, пока блокируется работа монитора. По этой причине некоторые люди предпочитают не использовать мониторы. Существуют шаблоны проектирования мониторов, которые можно использовать для решения этих проблем. Эти шаблоны проектирования сгруппированы вместе как шаблоны блокировки.

Блокировка отряда

Блокировка отряда позволяет специальной задаче (командиру отряда) отслеживать прогресс стада или группа рабочих задач. Когда все (или достаточное количество) рабочих задач выполнены с некоторыми аспектами своей работы, и лидер готов продолжить, весь набор задач может пройти барьер и продолжить следующую последовательность своих действий. Цель состоит в том, чтобы позволить задачам выполняться асинхронно, но при этом координировать их продвижение посредством сложного набора действий.

package Barriers is
   protected type Barrier(Trigger : Positive) is
      entry Wait_For_Leader; 
      entry Wait_For_Herd; 
      procedure Leader_Done; 
   private
      Done : Boolean := False;
   end Barrier;

   protected type Autobarrier(Trigger : Positive) is
      entry Wait_For_Leader; 
      entry Wait_For_Herd; 
   private
      Done : Boolean := False;
   end Autobarrier;
end Barriers;

Этот пакет показывает два вида групповой блокировки. Тип с защитой Барьер демонстрирует базовый c отрядный замок. Стадо вызывает Wait_For_Leader, а лидер вызывает Wait_For_Herd, а затем Leader_Done. Autobarrier демонстрирует более простой интерфейс. Стадо вызывает Wait_For_Leader, а лидер вызывает Wait_For_Herd. Параметр Trigger используется при создании экземпляра любого типа барьера. Он устанавливает минимальное количество задач стада, которые лидер должен подождать, прежде чем он сможет продолжить.

package body Barriers is
   protected body Barrier is
      entry Wait_For_Herd when Wait_For_Leader'Count >= Trigger is
      begin
         null;
      end Wait_For_Herd;

      entry Wait_For_Leader when Done is
      begin
         if Wait_For_Leader'Count = 0 then
            Done := False;
         end if;
      end Wait_For_Leader;

      procedure Leader_Done is
      begin
         Done := True;
      end Leader_Done;
   end Barrier;

   protected body Autobarrier is
      entry Wait_For_Herd when Wait_For_Leader'Count >= Trigger is
      begin
         Done := True;
      end Wait_For_Herd;

      entry Wait_For_Leader when Done is
      begin
         if Wait_For_Leader'Count = 0 then
            Done := False;
         end if;
      end Wait_For_Leader;
   end Autobarrier;
end Barriers;
...