Почему этот шаблон управления потоками может привести к тупику? - PullRequest
4 голосов
/ 08 декабря 2011

Я использую общий базовый класс has_threads для управления любым типом, которому следует разрешить создание экземпляра boost::thread.

Каждый экземпляр has_threads обладает set thread с (для поддержки waitAll и interruptAll функций, которые я не включаю ниже), и должен автоматически вызывать removeThread при завершении потока чтобы сохранить целостность set.

В моей программе у меня есть только один из них. Потоки создаются с интервалом каждые 10 с, и каждый выполняет поиск в базе данных. Когда поиск завершен, поток запускается до конца и должен быть вызван removeThread; с установленным мьютексом объект потока удаляется из внутреннего отслеживания. Я вижу, как это работает правильно с выводом ABC.

Время от времени, однако, механизмы сталкиваются. removeThread выполняется, возможно, дважды одновременно. Я не могу понять, почему это приводит к тупику . Все вызовы потоков с этой точки никогда не выводят ничего, кроме A. [Стоит отметить, что я использую потокобезопасный stdlib и проблема остается, когда потоки IOS не используются.] Трассировки стека указывают, что мьютекс блокирует эти потоки, но почему блокировка не будет в конце концов освобождается первым потоком для второго, затем вторым для третьего и так далее?

Я упускаю что-то фундаментальное о том, как работает scoped_lock? Есть ли здесь что-то очевидное, что я пропустил, что может привести к тупику, несмотря на (или даже из-за?) Использование блокировки мьютекса?

Извините за плохой вопрос, но, как я уверен, вы знаете, что почти невозможно представить реальные тестовые случаи для ошибок, подобных этой.

class has_threads {
    protected:
        template <typename Callable>
        void createThread(Callable f, bool allowSignals)
        {
            boost::mutex::scoped_lock l(threads_lock);

            // Create and run thread
            boost::shared_ptr<boost::thread> t(new boost::thread());

            // Track thread
            threads.insert(t);

            // Run thread (do this after inserting the thread for tracking so that we're ready for the on-exit handler)
            *t = boost::thread(&has_threads::runThread<Callable>, this, f, allowSignals);
        }

    private:

        /**
         * Entrypoint function for a thread.
         * Sets up the on-end handler then invokes the user-provided worker function.
         */
        template <typename Callable>
        void runThread(Callable f, bool allowSignals)
        {
            boost::this_thread::at_thread_exit(
                boost::bind(
                    &has_threads::releaseThread,
                    this,
                    boost::this_thread::get_id()
                )
            );

            if (!allowSignals)
                blockSignalsInThisThread();


            try {
                f();
            }
            catch (boost::thread_interrupted& e) {

                // Yes, we should catch this exception!
                // Letting it bubble over is _potentially_ dangerous:
                // http://stackoverflow.com/questions/6375121

                std::cout << "Thread " << boost::this_thread::get_id() << " interrupted (and ended)." << std::endl;
            }
            catch (std::exception& e) {
                std::cout << "Exception caught from thread " << boost::this_thread::get_id() << ": " << e.what() << std::endl;
            }
            catch (...) {
                std::cout << "Unknown exception caught from thread " << boost::this_thread::get_id() << std::endl;
            }
        }

        void has_threads::releaseThread(boost::thread::id thread_id)
        {
            std::cout << "A";
            boost::mutex::scoped_lock l(threads_lock);

            std::cout << "B";
            for (threads_t::iterator it = threads.begin(), end = threads.end(); it != end; ++it) {

                if ((*it)->get_id() != thread_id)
                    continue;

                threads.erase(it);
                break;
            }
            std::cout << "C";
        }

        void blockSignalsInThisThread()
        {
            sigset_t signal_set;
            sigemptyset(&signal_set);
            sigaddset(&signal_set, SIGINT);
            sigaddset(&signal_set, SIGTERM);
            sigaddset(&signal_set, SIGHUP);
            sigaddset(&signal_set, SIGPIPE); // http://www.unixguide.net/network/socketfaq/2.19.shtml
            pthread_sigmask(SIG_BLOCK, &signal_set, NULL);
        }


        typedef std::set<boost::shared_ptr<boost::thread> > threads_t;
        threads_t threads;

        boost::mutex threads_lock;
};

struct some_component : has_threads {
    some_component() {
        // set a scheduler to invoke createThread(bind(&some_work, this)) every 10s
    }

    void some_work() {
        // usually pretty quick, but I guess sometimes it could take >= 10s
    }
};

Ответы [ 3 ]

2 голосов
/ 08 декабря 2011

Возможно, вам потребуется сделать что-то вроде этого:

    void createThread(Callable f, bool allowSignals) 
    { 
        // Create and run thread 
        boost::shared_ptr<boost::thread> t(new boost::thread()); 

        {
            boost::mutex::scoped_lock l(threads_lock); 

            // Track thread 
            threads.insert(t);
        } 

        //Do not hold threads_lock while starting the new thread in case
        //it completes immediately

        // Run thread (do this after inserting the thread for tracking so that we're ready for the on-exit handler) 
        *t = boost::thread(&has_threads::runThread<Callable>, this, f, allowSignals); 
    } 

Другими словами, используйте thread_lock исключительно для защиты threads.

Обновление:

Чтобы подробнее остановиться на комментариях с предположениями о том, как работает boost :: thread, шаблоны блокировки могут выглядеть примерно так:

createThread:

  1. (createThread) получить threads_lock
  2. (boost::thread::opeator =) получить boost::thread внутренний замок
  3. (boost::thread::opeator =) освободить boost::thread внутренний замок
  4. (createThread) выпуск threads_lock

обработчик конца нити:

  1. (at_thread_exit) получить boost::thread внутренний замок
  2. (releaseThread) получить threads_lock
  3. (releaseThread) релиз threads_lock
  4. (at_thread_exit) разблокировать boost:thread внутренний замок

Если эти две блокировки boost::thread являются одной и той же блокировкой, вероятность возникновения тупика очевидна.Но это предположение, потому что большая часть кода повышения пугает меня, и я стараюсь не смотреть на него.

createThread можно / нужно переработать, чтобы переместить шаг 4 между шагами один и два и устранить потенциальную тупиковую ситуацию.

2 голосов
/ 08 декабря 2011

Ну, может произойти тупик, если тот же поток блокирует мьютекс, который он уже заблокировал (если вы не используете рекурсивный мьютекс).

Если часть выпуска вызывается во второй раз тем же потоком, как это происходит с вашим кодом, у вас тупик.

Я не изучил ваш код в деталях, но вам, вероятно, придется переделать ваш код (упростить?), Чтобы убедиться, что блокировка не может быть получена дважды одним и тем же потоком. Вы, вероятно, можете использовать защитную проверку владения замком ...

EDIT: Как сказано в моем комментарии и в ответе IronMensan, один из возможных случаев - остановка потока во время создания, вызов at_exit перед освобождением мьютекса, заблокированного в части создания вашего кода.

EDIT2:

Ну, с мьютексом и блокированной областью я могу только представить рекурсивную блокировку или блокировку, которая не снята. Это может произойти, если цикл становится бесконечным, например, из-за повреждения памяти.

Я предлагаю добавить больше журналов с идентификатором потока, чтобы проверить, есть ли рекурсивная блокировка или что-то странное. Тогда я проверю, что мой цикл правильный. Я также проверю, что at_exit вызывается только один раз для потока ...

Еще одна вещь, проверьте эффект стирания (таким образом вызывая деструктор) потока, находясь в функции at_exit ...

мои 2 цента

1 голос
/ 09 декабря 2011

Возможно, что созданный поток завершает работу до или во время выполнения оператора присваивания в createThread.Использование очереди событий или какой-либо другой структуры, которая может оказаться необходимой.Хотя более простое, но взломанное решение может также подойти.Не изменяйте createThread, поскольку вы должны использовать threads_lock для защиты threads как самого себя, так и thread объектов, на которые он указывает.Вместо этого измените runThread на это:

    template <typename Callable> 
    void runThread(Callable f, bool allowSignals) 
    { 
        //SNIP setup

        try { 
            f(); 
        } 
        //SNIP catch blocks

        //ensure that createThread is complete before this thread terminates
        boost::mutex::scoped_lock l(threads_lock);
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...