Я использую общий базовый класс has_threads
для управления любым типом, которому следует разрешить создание экземпляра boost::thread
.
Каждый экземпляр has_threads
обладает set
thread
с (для поддержки waitAll
и interruptAll
функций, которые я не включаю ниже), и должен автоматически вызывать removeThread
при завершении потока чтобы сохранить целостность set
.
В моей программе у меня есть только один из них. Потоки создаются с интервалом каждые 10 с, и каждый выполняет поиск в базе данных. Когда поиск завершен, поток запускается до конца и должен быть вызван removeThread
; с установленным мьютексом объект потока удаляется из внутреннего отслеживания. Я вижу, как это работает правильно с выводом ABC
.
Время от времени, однако, механизмы сталкиваются. removeThread
выполняется, возможно, дважды одновременно. Я не могу понять, почему это приводит к тупику . Все вызовы потоков с этой точки никогда не выводят ничего, кроме A
. [Стоит отметить, что я использую потокобезопасный stdlib и проблема остается, когда потоки IOS не используются.] Трассировки стека указывают, что мьютекс блокирует эти потоки, но почему блокировка не будет в конце концов освобождается первым потоком для второго, затем вторым для третьего и так далее?
Я упускаю что-то фундаментальное о том, как работает scoped_lock
? Есть ли здесь что-то очевидное, что я пропустил, что может привести к тупику, несмотря на (или даже из-за?) Использование блокировки мьютекса?
Извините за плохой вопрос, но, как я уверен, вы знаете, что почти невозможно представить реальные тестовые случаи для ошибок, подобных этой.
class has_threads {
protected:
template <typename Callable>
void createThread(Callable f, bool allowSignals)
{
boost::mutex::scoped_lock l(threads_lock);
// Create and run thread
boost::shared_ptr<boost::thread> t(new boost::thread());
// Track thread
threads.insert(t);
// Run thread (do this after inserting the thread for tracking so that we're ready for the on-exit handler)
*t = boost::thread(&has_threads::runThread<Callable>, this, f, allowSignals);
}
private:
/**
* Entrypoint function for a thread.
* Sets up the on-end handler then invokes the user-provided worker function.
*/
template <typename Callable>
void runThread(Callable f, bool allowSignals)
{
boost::this_thread::at_thread_exit(
boost::bind(
&has_threads::releaseThread,
this,
boost::this_thread::get_id()
)
);
if (!allowSignals)
blockSignalsInThisThread();
try {
f();
}
catch (boost::thread_interrupted& e) {
// Yes, we should catch this exception!
// Letting it bubble over is _potentially_ dangerous:
// http://stackoverflow.com/questions/6375121
std::cout << "Thread " << boost::this_thread::get_id() << " interrupted (and ended)." << std::endl;
}
catch (std::exception& e) {
std::cout << "Exception caught from thread " << boost::this_thread::get_id() << ": " << e.what() << std::endl;
}
catch (...) {
std::cout << "Unknown exception caught from thread " << boost::this_thread::get_id() << std::endl;
}
}
void has_threads::releaseThread(boost::thread::id thread_id)
{
std::cout << "A";
boost::mutex::scoped_lock l(threads_lock);
std::cout << "B";
for (threads_t::iterator it = threads.begin(), end = threads.end(); it != end; ++it) {
if ((*it)->get_id() != thread_id)
continue;
threads.erase(it);
break;
}
std::cout << "C";
}
void blockSignalsInThisThread()
{
sigset_t signal_set;
sigemptyset(&signal_set);
sigaddset(&signal_set, SIGINT);
sigaddset(&signal_set, SIGTERM);
sigaddset(&signal_set, SIGHUP);
sigaddset(&signal_set, SIGPIPE); // http://www.unixguide.net/network/socketfaq/2.19.shtml
pthread_sigmask(SIG_BLOCK, &signal_set, NULL);
}
typedef std::set<boost::shared_ptr<boost::thread> > threads_t;
threads_t threads;
boost::mutex threads_lock;
};
struct some_component : has_threads {
some_component() {
// set a scheduler to invoke createThread(bind(&some_work, this)) every 10s
}
void some_work() {
// usually pretty quick, but I guess sometimes it could take >= 10s
}
};