Как мне улучшить пул потоков, чтобы сделать его более безопасным? - PullRequest
0 голосов
/ 22 марта 2019

В настоящее время я изучаю основы пула потоков.Вот некоторые блоки кода, которые я написал с учетом некоторых примеров, найденных в Интернете:

SyncQueue.h

#ifndef SYNC_QUEUE_H
#define SYNC_QUEUE_H

#include <list>
#include <mutex>
#include <iostream>

template<typename T>
class SyncQueue {
public:
  SyncQueue();
  ~SyncQueue();
  SyncQueue(const SyncQueue&) = delete;
  SyncQueue& operator=(const SyncQueue &) = delete;
  void append(const T& data);
  T& get();
  unsigned long size();
  bool empty();
private:
  std::list<T> queue;
  std::mutex myMutex;
};
#endif

SyncQueue.cpp

#include "SyncQueue.h"

template<typename T>
SyncQueue<T>::SyncQueue():
  queue(),
  myMutex() {}

template<typename T>
SyncQueue<T>::~SyncQueue() {}

template<typename T>
void SyncQueue<T>::append(const T& data) {
  std::unique_lock<std::mutex> l(myMutex);
  queue.push_back(data);
}

template<typename T>
T& SyncQueue<T>::get() {
  std::unique_lock<std::mutex> l(myMutex);
  T& res = queue.front();
  queue.pop_front();
  return res;
}

template<typename T>
unsigned long SyncQueue<T>::size() {
  std::unique_lock<std::mutex> l(myMutex);
  return queue.size();
}

template<typename T>
bool SyncQueue<T>::empty() {
  std::unique_lock<std::mutex> l(myMutex);
  return queue.empty();
}

template class SyncQueue<std::function<void()>>;

ThreadPool.h

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <atomic>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>
#include "SyncQueue.h"

class ThreadPool {
public:
  ThreadPool(unsigned long thrdAmount = 0);
  virtual ~ThreadPool();
  void appendTask(std::function<void()> func);
  unsigned long pendingTasks();
private:
  void runThread();
  unsigned int myThrdAmount;
  std::atomic<bool> done;
  SyncQueue<std::function<void()>> syncQueue;
  std::vector<std::thread> threads;
  std::condition_variable myCondVar;
  std::mutex myMutex;
};

#endif

ThreadPool.cpp

#include "ThreadPool.h"

ThreadPool::ThreadPool(unsigned long thrdAmount):
  myThrdAmount(0),
  done(false),
  syncQueue(),
  threads(),
  myCondVar(),
  myMutex() {
  if (thrdAmount > 0) {
    myThrdAmount = thrdAmount;
  } else {
    myThrdAmount = std::thread::hardware_concurrency();
  }
  for (unsigned int i = 0; i < myThrdAmount; i++) {
    threads.push_back(std::thread(&ThreadPool::runThread, this));
  }
}

ThreadPool::~ThreadPool() {
  done = true;
  myCondVar.notify_all();
  for (auto& thrd: threads) {
    if (thrd.joinable()) {
      thrd.join();
    }
  }
}

void ThreadPool::appendTask(std::function<void()> func) {
  syncQueue.append(func);
  {
    std::unique_lock<std::mutex> l(myMutex);
    myCondVar.notify_one();
  }
}

unsigned long ThreadPool::pendingTasks() {
  return syncQueue.size();
}

void ThreadPool::runThread() {
  while (!done) {
    if (syncQueue.empty()) {
      std::unique_lock<std::mutex> l(myMutex);
      myCondVar.wait(l);
      continue;
    }
    syncQueue.get()();
  }
}

main.cpp

#include <unistd.h>
#include <iostream>
#include "ThreadPool.h"

void print() {
  std::cout << "Hello World!" << std::endl;
}

int main(int argc, char const *argv[]) {
  ThreadPool p;
  for (int i = 0; i < 20; i++) {
    p.appendTask(print);
  }
  std::cout << "Pending: " << p.pendingTasks() << std::endl;
  sleep(5);
  for (int i = 0; i < 20; i++) {
    p.appendTask(print);
  }
  return 0;
}

Несмотря на то, что все операции на SyncQueue заблокированы мьютексом ипеременная условия ThreadPool также защищена мьютексом, код часто приводит к неопределенному поведению.

Тем не менее, не могли бы вы объяснить, где в коде отсутствует безопасность потоков?Как я должен улучшить это?

1 Ответ

1 голос
/ 22 марта 2019
 void ThreadPool::appendTask(std::function<void()> func) {
  syncQueue.append(func);
  {
    std::unique_lock<std::mutex> l(myMutex);
    myCondVar.notify_one();
  }
}

void ThreadPool::runThread() {
  while (!done) {
    if (syncQueue.empty()) {
      std::unique_lock<std::mutex> l(myMutex);
      myCondVar.wait(l);
      continue;
    }
    syncQueue.get()();
  }
}

Проблема в том, что myMutex на самом деле ничего не защищает. Таким образом, ваш код имеет катастрофическое состояние гонки в ожидании очереди.

Рассмотрим:

  1. Тема вызова runThread видит syncQueue пусто.
  2. Поток вызова appendTask добавляет задание в очередь и вызывает notify_one. Нет темы для уведомления.
  3. Поток, вызывающий runThread, наконец, получает блокировку на myMutex и ожидает переменную условия, но очередь не пуста.

Крайне важно, чтобы переменная условия, которую вы используете для ожидания, была связана с мьютексом, который защищает ожидаемый вами предикат. Вся цель условной переменной состоит в том, чтобы позволить вам атомарно разблокировать предикат и ожидать сигнала без состояния гонки. Но вы похоронили предикат внутри syncQueue, победив логику обработки блокировки условной переменной.

Вы можете исправить это состояние гонки, сделав все вызовы в syncQueue под защитой мьютекса myMutex. Но может иметь смысл сделать syncQueue ожидаемым. Это может затруднить закрытие пула потоков.

...