Тупик с очередью блокировки и барьером в C ++ - PullRequest
0 голосов
/ 02 июня 2018

У меня есть очень простая и небольшая программа на C ++, которая создает пул потоков , затем помещает сообщений в очередь блокировки , разделяемую между потоками, чтобы сказать каждомупоток, что делать.

Сообщение может быть: -1 (конец потока -> завершить), -2 (барьер -> ждать, пока все потоки достигнутэто, затем продолжить), другие значения , чтобы сделать случайные вычисления.Цикл выполняется в следующем порядке: некоторые вычисления, барьер, некоторые вычисления, барьер, ..., барьер, конец потока, объединение потоков, выход.

Я не могу понять почемуЯ получаю тупик даже с двумя потоками в пуле.Очередь не может стать пустой, но порядок, в котором я нажимаю и выскакиваю сообщения, всегда приводит к пустой очереди!

Реализация блокирующей очереди - та, что предлагается здесь ( C ++ Эквивалент JavaBlockingQueue ) с добавлением всего двух методов.Я также копирую код очереди ниже.

Любая помощь?

Main.cpp

#include <iostream>
#include <vector>
#include <thread>
#include "Queue.hpp"

using namespace std;

// function executed by each thread
void f(int i, Queue<int> &q){
    while(1){
        // take a message from blocking queue
        int j= q.pop();
        // if it is end of stream then exit
        if (j==-1) break;
        // if it is barrier, wait for other threads to reach it
        if (j==-2){
            // active wait! BAD, but anyway...
            while(q.size() > 0){
                ;
            }
        }
        else{
            // random stuff
            int x = 0;
            for(int i=0;i<j;i++)
                x += 4;
        }
    }
}

int main(){
    Queue<int> queue; //blocking queue
    vector<thread> tids; // thread pool
    int nt = 2; // number of threads
    int dim = 8; // number to control number of operations

    // create thread pool, passing thread id and queue
    for(int i=0;i<nt;i++)
        tids.push_back(thread(f,i, std::ref(queue)));

    for(int dist=1; dist<=dim; dist++){ // without this outer loop the program works fine

        // push random number
        for(int j=0;j<dist;j++){    
            queue.push(4);  
        }

        // push barrier code
        for(int i=0;i<nt;i++){
            queue.push(-2);
        }

        // active wait! BAD, but anyway...
        while (queue.size()>0){
                 ;
        }
    }
    // push end of stream
    for(int i=0;i<nt;i++)
        queue.push(-1);
    // join thread pool
    for(int i=0;i<nt;i++){
        tids[i].join();
    }           
return 0;
}

Queue.hpp

#include <deque>
#include <mutex>
#include <condition_variable>

template <typename T>
class Queue
{
private:
  std::mutex              d_mutex;
  std::condition_variable d_condition;
  std::deque<T>           d_queue;
public:

  void push(T const& value) {
    {
      std::unique_lock<std::mutex> lock(this->d_mutex);
      d_queue.push_front(value);
    }
    this->d_condition.notify_one();
  }

  T pop() {
    std::unique_lock<std::mutex> lock(this->d_mutex);
    this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
    T rc(std::move(this->d_queue.back()));
    this->d_queue.pop_back();
    return rc;
  }

  bool empty(){
      std::unique_lock<std::mutex> lock(this->d_mutex); 
      return this->d_queue.empty(); 
  }

  int size(){
    std::unique_lock<std::mutex> lock(this->d_mutex); 
    return this->d_queue.size();
  }
};

Ответы [ 2 ]

0 голосов
/ 02 июня 2018

Я запустил ваш код и понял проблему.Проблема с опцией «-2».Когда два потока достигают этой точки, ваш основной поток уже поместил другие значения в очередь.Таким образом, если ваша очередь увеличила свой размер между временем, когда ваши потоки получили значение «-2», и до того, как они достигли опции «-2», ваш код зависнет: Поток 1: получить -2.Тема 2: получить -2.Основная нить: нажать -1.Основная нить: нажать -1.Поток 1: подождите, пока вся очередь не станет пустой.Поток 2: подождите, пока вся очередь не будет пустой.

очередь: -1 -1

^ это в случае, если dim равно 1. В вашем коде dim равно 8, вы неЯ не хочу видеть, как это выглядит. Чтобы решить эту проблему, все, что я сделал, это отключил следующий цикл:

for(int i=0;i<nt;i++){
    queue.push(-2);
}

Когда этот пард отключен, код работает отлично.Вот как я это проверил:

std::mutex guarder;

// function executed by each thread
void f(int i, Queue<int> &q){
    while(1){
        // take a message from blocking queue
        guarder.lock();
        int j= q.pop();
        guarder.unlock();
        // if it is end of stream then exit
        if (j==-1) break;
        // if it is barrier, wait for other threads to reach it
        if (j==-2){
            // active wait! BAD, but anyway...
            while(q.size() > 0){
                ;
            }
        }
        else{
            // random stuff
            int x = 0;
            for(int i=0;i<j;i++)
                x += 4;
            guarder.lock();
            cout << x << std::endl;
            guarder.unlock();
        }
    }
}

int main(){
    Queue<int> queue; //blocking queue
    vector<thread> tids; // thread pool
    int nt = 2; // number of threads
    int dim = 8; // number to control number of operations

    // create thread pool, passing thread id and queue
    for(int i=0;i<nt;i++)
        tids.push_back(thread(f,i, std::ref(queue)));

    for(int dist=1; dist<=dim; dist++){ // without this outer loop the program works fine

        // push random number
        for(int j=0;j<dist;j++){
            queue.push(dist);
        }

        /*// push barrier code
        for(int i=0;i<nt;i++){
            queue.push(-2);
        }*/

        // active wait! BAD, but anyway...
        while (queue.size()>0){
            ;
        }
    }
    // push end of stream
    for(int i=0;i<nt;i++)
        queue.push(-1);
    // join thread pool
    for(int i=0;i<nt;i++){
        tids[i].join();
    }
    return 0;
}

Результат:

4
8
8
12
12
12
16
16
16
20
20
16
20
20
20
24
24
24
24
24
24
28
28
28
28
28
28
28
32
32
32
32
32
32
32
32

Кстати, зависание не произошло из-за вашей части "активного ожидания".Это не хорошо, но обычно вызывает другие проблемы (например, замедляет работу вашей системы).

0 голосов
/ 02 июня 2018

Я думаю, что проблема заключается в вашем активном ожидании, которое вы описываете как "ПЛОХО, но в любом случае ..." и используете размер очереди в качестве барьера вместо использования истинного барьера синхронизации

Для dim = 1 вы выдвигаете очередь с 4, -2, -2.Одна нить захватит 4 и -2, а другая - оставшиеся -2.На данный момент очередь пуста, и у вас есть три потока (два рабочих и основной поток), которые проводят активные гонки ожидания, чтобы увидеть, была ли очередь очищена.Существует мьютекс по размеру, который позволяет только одному читать размер за раз.Если основной поток запланирован первым и определит, что очередь пуста, он будет нажимать -1, -1, чтобы сигнализировать об окончании потока.Теперь очередь больше не пуста, но один или оба рабочих потока ожидают ее опустошения.Так как они ждут, пока он опустеет, перед тем, как взять другой элемент, очередь заблокирована в этом состоянии.

Для случая, когда dim> 1, вероятно, существует аналогичная проблема с помещением следующего набора значений в очередь в главном потоке до того, как обе работы подтвердят пустую очередь и выйдут из активного ожидания.

...