Как использовать std :: condition_variable в цикле - PullRequest
3 голосов
/ 28 июня 2019

Я пытаюсь реализовать некоторый алгоритм, используя потоки, которые должны быть синхронизированы в какой-то момент.Более или менее последовательность для каждого потока должна быть:

1. Try to find a solution with current settings.
2. Synchronize solution with other threads.
3. If any of the threads found solution end work.
4. (empty - to be inline with example below)
5. Modify parameters for algorithm and jump to 1.

Вот игрушечный пример с алгоритмом, измененным на генерацию случайных чисел - все потоки должны завершиться, если хотя бы один из них найдет 0.

#include <iostream>
#include <condition_variable>
#include <thread>
#include <vector>

const int numOfThreads = 8;

std::condition_variable cv1, cv2;
std::mutex m1, m2;
int lockCnt1 = 0;
int lockCnt2 = 0;

int solutionCnt = 0;

void workerThread()
{
    while(true) {
        // 1. do some important work
        int r = rand() % 1000;

        // 2. synchronize and get results from all threads
        {
            std::unique_lock<std::mutex> l1(m1);
            ++lockCnt1;
            if (r == 0) ++solutionCnt; // gather solutions
            if (lockCnt1 == numOfThreads) {
                // last thread ends here
                lockCnt2 = 0;
                cv1.notify_all();
            }
            else {
                cv1.wait(l1, [&] { return lockCnt1 == numOfThreads; });
            }
        }

        // 3. if solution found then quit all threads
        if (solutionCnt > 0) return;

        // 4. if not, then set lockCnt1 to 0 to have section 2. working again
        {
            std::unique_lock<std::mutex> l2(m2);
            ++lockCnt2;
            if (lockCnt2 == numOfThreads) {
                // last thread ends here
                lockCnt1 = 0;
                cv2.notify_all();
            }
            else {
                cv2.wait(l2, [&] { return lockCnt2 == numOfThreads; });
            }
        }

        // 5. Setup new algorithm parameters and repeat.
    }
}

int main()
{
    srand(time(NULL));

    std::vector<std::thread> v;
    for (int i = 0; i < numOfThreads ; ++i) v.emplace_back(std::thread(workerThread));
    for (int i = 0; i < numOfThreads ; ++i) v[i].join();

    return 0;
}

У меня есть вопросы о разделах 2. и 4. из кода выше.

A) В разделе 2 происходит синхронизация всех потоков и сбор решений (если они найдены).Все делается с помощью переменной lockCnt1.По сравнению с однократным использованием condition_variable, мне трудно было установить lockCnt1 на ноль, чтобы иметь возможность повторно использовать этот раздел (2.) в следующий раз.Поэтому я представил раздел 4. Есть ли лучший способ сделать это (без введения раздела 4)?

B) Кажется, что все примеры показывают использование condition_variable, а не в контексте сценария «производитель-потребитель».Есть ли лучший способ синхронизировать все потоки в случае, когда все являются «производителями»?

Редактировать: Просто чтобы прояснить, я не хотел описывать детали алгоритма, так как это не важно здесь - в любом случае это необходимоиметь все решения или ни одного из данного выполнения цикла и смешивать их не допускается.Описанная последовательность выполнения должна соблюдаться, и вопрос заключается в том, как обеспечить такую ​​синхронизацию между потоками.

Ответы [ 2 ]

1 голос
/ 28 июня 2019

A) Вы не можете просто сбросить lockCnt1 в 0, просто продолжайте увеличивать его дальше. Условие lockCnt2 == numOfThreads затем изменяется на lockCnt2 % numOfThreads == 0. Затем вы можете сбросить блок № 4. В будущем вы могли бы также использовать std::experimental::barrier, чтобы заставить нити встретиться.

B) Я бы предложил использовать std::atomic для solutionCnt, а затем вы можете сбросить все остальные счетчики, мьютекс и переменную условия. Просто атомарно увеличивайте его на единицу в потоке, который нашел решение, а затем возвращайте. Во всех потоках после каждой итерации проверяйте, является ли значение больше нуля. Если это так, то вернитесь. Преимущество состоит в том, что потоки не должны регулярно встречаться, но могут попытаться решить их в своем собственном темпе.

0 голосов
/ 29 июня 2019

Из любопытства я попытался решить вашу проблему с помощью std::async. Для каждой попытки найти решение мы вызываем async. После завершения всех параллельных попыток мы обрабатываем обратную связь, корректируем параметры и повторяем. Важным отличием вашей реализации является то, что обратная связь обрабатывается в вызывающем (основном) потоке. Если обработка обратной связи занимает слишком много времени - или если мы вообще не хотим блокировать основной поток - тогда код в main() можно настроить так, чтобы он также вызывал std::async.

Код должен быть достаточно эффективным при условии, что реализация async использует пул потоков (например, это делает реализация Microsoft).

#include <chrono>
#include <future>
#include <iostream>
#include <vector>

const int numOfThreads = 8;

struct Parameters{};
struct Feedback {
    int result;
};

Feedback doTheWork(const Parameters &){
    // do the work and provide result and feedback for future runs
    return Feedback{rand() % 1000};
}

bool isSolution(const Feedback &f){
    return f.result == 0;
}

// Runs doTheWork in parallel. Number of parallel tasks is same as size of params vector
std::vector<Feedback> findSolutions(const std::vector<Parameters> &params){

    // 1. Run async tasks to find solutions. Normally threads are not created each time but re-used from a pool
    std::vector<std::future<Feedback>> futures;
    for (auto &p: params){
        futures.push_back(std::async(std::launch::async,
                                    [&p](){ return doTheWork(p); }));
    }

    // 2. Syncrhonize: wait for all tasks
    std::vector<Feedback> feedback(futures.size());
    for (auto nofRunning = futures.size(), iFuture = size_t{0}; nofRunning > 0; ){

        // Check if the task has finished (future is invalid if we already handled it during an earlier iteration)
        auto &future = futures[iFuture];
        if (future.valid() && future.wait_for(std::chrono::milliseconds(1)) != std::future_status::timeout){
            // Collect feedback for next attempt
            // Alternatively, we could already check if solution has been found and cancel other tasks [if our algorithm supports cancellation]
            feedback[iFuture] = std::move(future.get());
            --nofRunning;
        }

        if (++iFuture == futures.size())
            iFuture = 0;
    }
    return feedback;
}

int main()
{
    srand(time(NULL));

    std::vector<Parameters> params(numOfThreads);
    // 0. Set inital parameter values here

    // If we don't want to block the main thread while the algorithm is running, we can use std::async here too
    while (true){
        auto feedbackVector = findSolutions(params);
        auto itSolution = std::find_if(std::begin(feedbackVector), std::end(feedbackVector), isSolution);

        // 3. If any of the threads has found a solution, we stop
        if (itSolution != feedbackVector.end())
            break;

        // 5. Use feedback to re-configure parameters for next iteration
    }

    return 0;
}
...