Блокировка / заморозка в параллельном коде c ++ с использованием Intel TBB - PullRequest
0 голосов
/ 22 марта 2011

Я попытался написать конвейерную версию Bitonic Sort, используя этапы чтения, сортировки и записи файлов, используя Intel TBB, как показано ниже. Код останавливается в спин-блокировке в то время как (! OutQueue.try_pop (line)); в фильтре FileWriter. Может кто-нибудь объяснить, почему это может быть?

обновление: Я провел дополнительное тестирование и обнаружил, что для internal_try_pop, который вызывается try_pop из заголовочного файла _concurrent_queue_internal.h, есть операция compare_and_swap, которая всегда завершается неудачей для этого конкретного try_pop. Ниже приведены значения, извлеченные мной из internal_try_pop

head counter(k)15
tail counter1605177747
item ticket(tk)15
k after head CAS 15
(k=tk)15,15---break!!

Я думаю, что значение счетчика хвостов - мусор. Единственная причина, по которой я могу думать в этой ситуации, заключается в том, что добавленное в очередь значение сортировщиком может быть неявно изменено, что делает его недоступным.

Есть идеи?

Спасибо:)

#include <iostream>
#include <sstream>
#include <string>
#include <algorithm>
#include <fstream>
#include "tbb\parallel_for.h"
#include "tbb\blocked_range.h"
#include "tbb\pipeline.h"
#include "tbb\concurrent_queue.h"

using namespace tbb;
using namespace std;

// Filter that writes lines to the output file.
class FileWriterFilter: public tbb::filter {
string outPath;
public:
FileWriterFilter(string outPath );
/*override*/void* operator()( void* item );
};

FileWriterFilter::FileWriterFilter( string outPath ) :
tbb::filter(/*is_serial=*/true),
outPath(outPath)
{
}

void* FileWriterFilter::operator()( void* item ) {

concurrent_queue<string>& outQueue = *static_cast<concurrent_queue<string>*>(item);
string line;
while(!outQueue.try_pop(line));

ofstream myfile(outPath);
if (myfile.is_open())
{
    myfile <<line<<endl;
}
//myfile.close();
return NULL;

 }

 class FileReaderFilter: public tbb::filter {
public:

FileReaderFilter(string inPath);

private:
ifstream ifs;
tbb::concurrent_queue<string> queue;
/*override*/ void* operator()(void*);

 };

 FileReaderFilter::FileReaderFilter(string inPath ) :
filter(/*is_serial=*/true),
ifs(inPath)
{
}

 void* FileReaderFilter::operator()(void*) {

string temp;

if( getline( ifs, temp ))
{

    queue.push(temp);
}

return &queue;
 }

class BitonicSort: public tbb::filter{

public:
    BitonicSort();
/*override*/void* operator()( void* item );

size_t *a;

private : static const  bool ASCENDING=true, DESCENDING=false;

public :void sort(size_t *b,int n)
{
    a=b;
    bitonicSort(0,n,ASCENDING);

}

private: void bitonicSort(int lo,int n,bool dir)
{
    if(n>1)
    {
        int m=n/2;
        bitonicSort(lo,m,ASCENDING);

        bitonicSort(lo+m,m,DESCENDING);
        bitonicMerge(lo,n,dir);


    }

}

private : void bitonicMerge(int lo,int n,bool dir)
     {
         if(n>1)
         {
             int m=n/2;
             for(int i=lo;i<lo+m;i++)
             {
                 compare(i,i+m,dir);

             }
             bitonicMerge(lo,m,dir);
             bitonicMerge(lo+m,m,dir);
         }
     }

private : void compare(int i,int j, bool dir)
          {
              if(dir==a[i]>a[j])
              {
                  exchange(i,j);

              }

          }

private : void exchange(int i,int j)
          {
            /*  cout<<a[i]<<" "<<a[j]<<endl;*/
              int t=a[i];
              a[i]=a[j];
              a[j]=t;
              /*cout<<a[i]<<" "<<a[j]<<endl<<endl;*/
          }

   private :string convertInt(int number)
   {
   stringstream ss;//create a stringstream
   ss << number;//add number to the stream
   return ss.str();//return a string with the contents of the stream
}
 };


 BitonicSort::BitonicSort() :
tbb::filter(/*serial=*/false)
 {}

 /*override*/void* BitonicSort::operator()( void* item ) {

int num_elem=2048;
size_t *max = new size_t[num_elem];
concurrent_queue<string>& queue = *static_cast<concurrent_queue<string>*>(item);
concurrent_queue<string> outQueue;
string line;
while(!queue.try_pop(line));  
istringstream iss(line);
int i=0;
do
{
    string sub;
    iss >> sub;
    max[i]=atoi(sub.c_str());;
    i++;
} while (iss);

sort(max,num_elem);

string out;

for(int i=0;i<num_elem;i++)
{
    out.append(convertInt(max[i]).append(" "));
}

outQueue.push(out);

return &outQueue;
 }


 int main() {

tbb::pipeline pipeline;

FileReaderFilter reader("sample.txt");
pipeline.add_filter(reader);

BitonicSort sorter;
pipeline.add_filter(sorter);

FileWriterFilter writer("test.txt");
pipeline.add_filter(writer);

pipeline.run(3);


pipeline.clear();

system("PAUSE");
 }

1 Ответ

1 голос
/ 23 марта 2011

Я нашел это!Это была довольно тривиальная ошибка.Я объявил 2-й concurrent_queue как переменную метода в методе оператора фильтра сортировщика конвейера.Таким образом, каждый раз, когда оператор выполняет метод, очередь повторно инициализируется, аннулируя указатель, отправленный фильтру записи.очередь должна быть переменной класса фильтра сортировки, и все работает нормально.В модуле записи файлов была еще одна ошибка, которая была изменена в следующем.`

   #include <string>
   #include <algorithm>
   #include <fstream>
   #include "tbb\parallel_for.h"
   #include "tbb\blocked_range.h"
   #include "tbb\pipeline.h"
   #include "tbb\concurrent_queue.h"
   #include "tbb\task_scheduler_init.h"
   #include "tbb\tbb_thread.h"
   #include "tbb\task.h"
   #include <iostream>
   #include <sstream>

  using namespace tbb;
  using namespace std;


 // Filter that writes lines to the output file.
 class FileWriterFilter: public tbb::filter {

public:
int count;
FileWriterFilter(FILE* outFile);

private:
    FILE* outFile;

/*override*/void* operator()( void* item );
};

FileWriterFilter::FileWriterFilter(FILE* outFile) :
tbb::filter(/*is_serial=*/true),
outFile(outFile),count(0)
{
}

/*override*/void* FileWriterFilter::operator()( void* item ) {

tbb::concurrent_queue<string> &outQueue = *static_cast<tbb::concurrent_queue<string>*>   (item);
string outLine;

while(!outQueue.try_pop(outLine))
    this_tbb_thread::yield();

fprintf(outFile,outLine.append("\n").c_str());

count++;
if(count==10000){
    cout<<"over"<<endl;

}
return NULL;

 }


class FileReaderFilter: public tbb::filter {
public:
    FileReaderFilter(char* inPath);

private:
int count;
ifstream ifs;
tbb::concurrent_queue<string> queue;
/*override*/ void* operator()(void*);

 };

FileReaderFilter::FileReaderFilter(char* inPath ) :
filter(/*is_serial=*/true),
ifs(inPath),count(0)
{
}

/*override*/void* FileReaderFilter::operator()(void*) {

string temp;
count++;
if(count<=10000){
    if( getline( ifs, temp ))
    {

           queue.push(temp);

    }
    return &queue;
 }
else{
    return NULL;
}
}


class bitonicMerger : public tbb::task{
int lo;
int n;
bool dir;
size_t* a;
private : static const  bool ASCENDING=true, DESCENDING=false;

public:
    bitonicMerger(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {}


    task* execute() {
    if(n>1)
    {                   
        int m=n/2;
        for(int i=lo;i<lo+m;i++)
        {
         compare(i,i+m,dir);
        }       

        int count = 1;
        tbb::task_list list;
        ++count;
        list.push_back( *new( allocate_child() ) bitonicMerger(lo,m,dir,a) );
        ++count;
        list.push_back( *new( allocate_child() ) bitonicMerger(lo+m,m,dir,a) );
        set_ref_count(count);
        spawn_and_wait_for_all(list);
    }
    return NULL;
}

   private : void compare(int i,int j, bool dir)
          {
              if(dir==a[i]>a[j])
              {
                  exchange(i,j);

              }

          }

private : void exchange(int i,int j)
          {

              int t=a[i];
              a[i]=a[j];
              a[j]=t;

          }

   };


 class bitonicSorter : public tbb::task{
int lo;
int n;
bool dir;
size_t* a;

private : static const  bool ASCENDING=true, DESCENDING=false;

public:
    bitonicSorter(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {}

task* execute() {
    if(n>1)
    {                   
        int m=n/2;
        int count = 1;
        tbb::task_list list;
        ++count;
        list.push_back( *new( allocate_child() ) bitonicSorter(lo,m,ASCENDING,a) );
        ++count;
        list.push_back( *new( allocate_child() ) bitonicSorter(lo+m,m,DESCENDING,a) );
        set_ref_count(count);
        spawn_and_wait_for_all(list);



        count = 1;
        tbb::task_list list1;
        ++count;
        list1.push_back( *new( allocate_child() ) bitonicMerger(lo,n,dir,a) );
        set_ref_count(count);
        spawn_and_wait_for_all(list1);

    }
    return NULL;
}

 };




 class TBitonicSort : public tbb::filter{


public:
    TBitonicSort();
/*override*/void* operator()( void* item );


size_t *a;

private : static const  bool ASCENDING=true, DESCENDING=false;
private : tbb::concurrent_queue<string> outQueue;

public :void sort(size_t *b,int n)
{
    a=b;        
    bitonicSorter& tt = *new(tbb::task::allocate_root())   bitonicSorter(0,n,ASCENDING,a);
    tbb::task::spawn_root_and_wait(tt); 
}




};

string convertInt(int number)
{
   stringstream ss;//create a stringstream
   ss << number;//add number to the stream
   return ss.str();//return a string with the contents of the stream
}




TBitonicSort::TBitonicSort() :
filter(/*is_serial=*/true) 
{}

/*override*/void* TBitonicSort::operator()( void* item ) {

int num_elem=2048;
size_t *max = new size_t[num_elem];
tbb::concurrent_queue<string>& queue = *static_cast<tbb::concurrent_queue<string>*>(item);

string line;

while(!queue.try_pop(line))
    this_tbb_thread::yield();   

istringstream iss(line);
int i=0;
do
{
    string sub;
    iss >> sub;
    max[i]=atoi(sub.c_str());;
    i++;
} while (iss);

sort(max,num_elem);

string out;


for(int i=0;i<num_elem;i++)
{
    out.append(convertInt(max[i]).append(" "));
}


outQueue.push(out);

return &outQueue;
}

int run_pipe(int threads)
{
    FILE* output_file = fopen("test.txt","w");
    if( !output_file ) {
        perror( "test.txt" );
        return 0;
    }
    char* input_file="sample.txt";


    tbb::pipeline pipeline;

    FileReaderFilter reader(input_file);
    pipeline.add_filter(reader);

    TBitonicSort sorter;
    pipeline.add_filter(sorter);

    FileWriterFilter writer(output_file);
    pipeline.add_filter(writer);

    tbb::tick_count t0 = tbb::tick_count::now();
    pipeline.run(threads);
    tbb::tick_count t1 = tbb::tick_count::now();

    fclose( output_file );
    pipeline.clear();

    if(threads==1){
         printf("serial run   time = %g\n", (t1-t0).seconds());
    }
    else{
        printf("parallel run time = %g\n", (t1-t0).seconds());
    }

    return 0;
 }

int main() {

int threads[2]={1,3};


for(int i=0;i<2;i++)
{
    run_pipe(threads[i]);
}
system("PAUSE");
}
...