Пул потоков блокирует основные потоки после некоторых циклов - PullRequest
1 голос
/ 23 февраля 2020

Я пытаюсь узнать, как работает многопоточность в C ++, и нашел реализацию, которую я использовал в качестве руководства для создания своей собственной реализации, однако после al oop или пары она блокируется.

I есть потокобезопасная очередь, в которой я получаю задания, назначенные пулу потоков.

Каждый поток выполняет эту функцию:

// Declarations
std::vector<std::thread> m_threads;
JobQueue m_jobs; // A queue with locks
std::mutex m_mutex;
std::condition_variable m_condition;
std::atomic_bool m_active;
std::atomic_bool m_started;
std::atomic_int m_busy;
///...

[this, threadIndex] {
    int numThread = threadIndex;

    while(this->m_active) {
        std::unique_ptr<Job> currJob;
        bool dequeued = false;
        {
            std::unique_lock<std::mutex> lock { this->m_mutex };
            this->m_condition.wait(lock, [this, numThread]() {
                return (this->m_started && !this->m_jobs.empty()) || !this->m_active;
            });
            if (this->m_active) {
                m_busy++;
                dequeued = this->m_jobs.dequeue(currJob);
            }
        }

        if (dequeued) {
            currJob->execute();
            {
                std::lock_guard<std::mutex> lock { this->m_mutex };
                m_busy--;
            }
            m_condition.notify_all();
        } else {
            {
                std::lock_guard<std::mutex> lock { this->m_mutex };
                m_busy--;
            }
        }
    }
}

, а l oop в основном:

while(1) {
        int numJobs = rand() % 10000;
        std::cout << "Will do " << numJobs << " jobs." << std::endl;
        while(numJobs--) {
            pool.assign([](){
                // some heavy calculation
             });
        }
        pool.waitEmpty();
        std::cout << "Done!" << std::endl; // chrono removed for readability
    }

В то время как метод waitEmpty описывается следующим образом:

std::unique_lock<std::mutex> lock { this->m_mutex };
this->m_condition.wait(lock, [this] {
    return this->empty();
});

И именно в этом методе ожидания код обычно зависает, так как тест внутри никогда больше не вызывается.

Я отладил его, изменил компонент уведомлений и все с места на место, но по некоторым причинам после некоторых циклов он всегда блокируется.

Обычно, но не всегда, он блокируется методом condition_variable.wait() это блокирует текущий поток, пока другой поток не работает, и очередь не становится пустой, но я также видел, как это происходит, когда я вызываю condition_variable.notify_all().

. Некоторые отладки помогли мне заметить, что при вызове notify_all() на рабская нить, тыс e wait() в главном потоке больше не тестируется.

Ожидаемое поведение - он не блокируется при зацикливании.

Я использую G++ 8.1.0 на Windows.

и вывод:

Will do 41 jobs.  
Done! Took 0ms!   
Will do 8467 jobs.


<main thread blocked>

Редактировать: Я исправил проблему На это указывает комментарий Пэдди: теперь m_busy-- также происходит, когда задание не отменено.

Редактировать 2: Выполнение этого на Linux не блокирует основной поток и работает, как ожидается. (g++ (Ubuntu 7.4.0-1ubuntu1~18.04.1) 7.4.0)

Редактировать 3: Как упоминалось в комментариях, исправлено deadlock до block, так как оно включает только одну блокировку

Edit 4: Как прокомментировал Jérôme Richard Я смог улучшить его, создав lock_guard вокруг m_busy--;, но теперь блоки кода на notify_all(), которые вызываются внутри присвоения метод. Вот метод назначения для справки:

template<class Func, class... Args>
    auto assign(Func&& func, Args&&... args) -> std::future<typename std::result_of<Func(Args...)>::type> {
        using jobResultType = typename std::result_of<Func(Args...)>::type;

        auto task = std::make_shared<std::packaged_task<jobResultType()>>(
            std::bind(std::forward<Func>(func), std::forward<Args>(args)...)
        );

        auto job = std::unique_ptr<Job>(new Job([task](){ (*task)(); }));
        std::future<jobResultType> result = task->get_future();

        m_jobs.enqueue(std::move(job));
        std::cout << " - enqueued";
        m_condition.notify_all();
        std::cout << " - ok!" << std::endl;

        return result;
    }

В одном из циклов последний вывод -

//...
 - enqueued - ok!
 - enqueued - ok!
 - enqueued

<blocked again>

Редактировать 5: С последними изменениями это не происходит в msbuild компиляторе.

Суть моей реализации здесь: https://gist.github.com/GuiAmPm/4be7716b7f1ea62819e61ef4ad3beb02

Вот также оригинальная статья, на которой я основал свою реализацию: https://roar11.com/2016/01/a-platform-independent-thread-pool-using-c14/

Любая помощь будет оценена.

1 Ответ

0 голосов
/ 24 февраля 2020

tl; dr: используйте std::lock_guard из m_mutex вокруг m_busy--, чтобы избежать непредвиденной блокировки условий ожидания.

Анализ

Прежде всего, обратите внимание, что проблема может происходят с одним потоком в пуле и только несколькими заданиями. Это означает, что существует проблема между главным потоком, который отправляет задания, и тем, который их выполняет. Используя GDB для дальнейшего анализа состояния программы, когда застревает условие ожидания, можно увидеть, что заданий нет, m_busy установлен в 0, и оба потока ожидают уведомлений. Это означает, что существует проблема параллелизма при условии ожидания между мастером и единственным рабочим на последнем задании, которое нужно выполнить. Добавив в ваш код глобальные атомные часы c, можно увидеть, что (почти во всех случаях) работник завершает все работы, прежде чем мастер сможет дождаться их завершения и выполнения.

Вот один из полученных практических сценариев (маркеры выполняются последовательно):

  • мастер запускает ожидающий вызов, и остаются оставшиеся задания
  • работник выполняет m_busy++, снимает с очереди последнее задание и выполнить его (m_busy теперь установлено в 1, а очередь заданий пуста)
  • мастер вычисляет предикат ожидающего вызова
  • мастер вызова ThreadPool::empty, и результат false из-за busy, установленного в 1
  • , выполняемого работником m_busy-- (m_busy теперь установлено в 0)
  • с того момента, мастер может ожидать возврата условия ( но подозревается , чтобы не делать этого)
  • рабочий уведомляет условие
  • мастер подозревает , что он может ждать условия только сейчас и чтобы это не было затронуто этим последним уведомлением (как нет Ожидания будут дальше)
  • В этот момент мастер больше не выполняет инструкции и будет ждать вечно
  • работник ждет условия и тоже будет ждать вечно

То, что уведомление не влияет на мастера, очень странно. Похоже, это связано с проблемами ограждения памяти. Более подробное объяснение можно найти здесь . Цитирую статью:

Даже если вы сделаете dataReady атомом c, его нужно изменить в мьютексе; если нет, то изменение ожидающего потока может быть опубликовано, но не правильно синхронизировано.

Таким образом, решение состоит в том, чтобы заменить инструкцию m_busy-- на следующие строки:

{
    std::lock_guard<std::mutex> lck {this->m_mutex};
    m_busy--;
}

Это позволит избежать предыдущего сценария. Действительно, с одной стороны, m_mutex получается во время проверки предиката вызова wait, предотвращающего изменение m_busy в этот конкретный c момент; с другой стороны, он обеспечивает правильную синхронизацию данных.

Теоретически безопаснее также включить в него вызов m_jobs.dequeue, но это сильно уменьшит степень параллелизма рабочих. На практике полезные синхронизации выполняются, когда блокировка снимается в рабочих потоках.

Обратите внимание, что одним из общих решений, позволяющих избежать подобных проблем, может быть добавление таймаута для ожидающих вызовов с использованием функции wait_for в oop для применения условия предиката. Однако это решение обеспечивает более высокую задержку ожидающих вызовов и, таким образом, может значительно замедлить выполнение.

...