Я нашел это!Это была довольно тривиальная ошибка.Я объявил 2-й concurrent_queue как переменную метода в методе оператора фильтра сортировщика конвейера.Таким образом, каждый раз, когда оператор выполняет метод, очередь повторно инициализируется, аннулируя указатель, отправленный фильтру записи.очередь должна быть переменной класса фильтра сортировки, и все работает нормально.В модуле записи файлов была еще одна ошибка, которая была изменена в следующем.`
#include <string>
#include <algorithm>
#include <fstream>
#include "tbb\parallel_for.h"
#include "tbb\blocked_range.h"
#include "tbb\pipeline.h"
#include "tbb\concurrent_queue.h"
#include "tbb\task_scheduler_init.h"
#include "tbb\tbb_thread.h"
#include "tbb\task.h"
#include <iostream>
#include <sstream>
using namespace tbb;
using namespace std;
// Filter that writes lines to the output file.
class FileWriterFilter: public tbb::filter {
public:
int count;
FileWriterFilter(FILE* outFile);
private:
FILE* outFile;
/*override*/void* operator()( void* item );
};
FileWriterFilter::FileWriterFilter(FILE* outFile) :
tbb::filter(/*is_serial=*/true),
outFile(outFile),count(0)
{
}
/*override*/void* FileWriterFilter::operator()( void* item ) {
tbb::concurrent_queue<string> &outQueue = *static_cast<tbb::concurrent_queue<string>*> (item);
string outLine;
while(!outQueue.try_pop(outLine))
this_tbb_thread::yield();
fprintf(outFile,outLine.append("\n").c_str());
count++;
if(count==10000){
cout<<"over"<<endl;
}
return NULL;
}
class FileReaderFilter: public tbb::filter {
public:
FileReaderFilter(char* inPath);
private:
int count;
ifstream ifs;
tbb::concurrent_queue<string> queue;
/*override*/ void* operator()(void*);
};
FileReaderFilter::FileReaderFilter(char* inPath ) :
filter(/*is_serial=*/true),
ifs(inPath),count(0)
{
}
/*override*/void* FileReaderFilter::operator()(void*) {
string temp;
count++;
if(count<=10000){
if( getline( ifs, temp ))
{
queue.push(temp);
}
return &queue;
}
else{
return NULL;
}
}
class bitonicMerger : public tbb::task{
int lo;
int n;
bool dir;
size_t* a;
private : static const bool ASCENDING=true, DESCENDING=false;
public:
bitonicMerger(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {}
task* execute() {
if(n>1)
{
int m=n/2;
for(int i=lo;i<lo+m;i++)
{
compare(i,i+m,dir);
}
int count = 1;
tbb::task_list list;
++count;
list.push_back( *new( allocate_child() ) bitonicMerger(lo,m,dir,a) );
++count;
list.push_back( *new( allocate_child() ) bitonicMerger(lo+m,m,dir,a) );
set_ref_count(count);
spawn_and_wait_for_all(list);
}
return NULL;
}
private : void compare(int i,int j, bool dir)
{
if(dir==a[i]>a[j])
{
exchange(i,j);
}
}
private : void exchange(int i,int j)
{
int t=a[i];
a[i]=a[j];
a[j]=t;
}
};
class bitonicSorter : public tbb::task{
int lo;
int n;
bool dir;
size_t* a;
private : static const bool ASCENDING=true, DESCENDING=false;
public:
bitonicSorter(int lo_,int n_,bool dir_,size_t* a_): lo(lo_), n(n_),dir(dir_), a(a_) {}
task* execute() {
if(n>1)
{
int m=n/2;
int count = 1;
tbb::task_list list;
++count;
list.push_back( *new( allocate_child() ) bitonicSorter(lo,m,ASCENDING,a) );
++count;
list.push_back( *new( allocate_child() ) bitonicSorter(lo+m,m,DESCENDING,a) );
set_ref_count(count);
spawn_and_wait_for_all(list);
count = 1;
tbb::task_list list1;
++count;
list1.push_back( *new( allocate_child() ) bitonicMerger(lo,n,dir,a) );
set_ref_count(count);
spawn_and_wait_for_all(list1);
}
return NULL;
}
};
class TBitonicSort : public tbb::filter{
public:
TBitonicSort();
/*override*/void* operator()( void* item );
size_t *a;
private : static const bool ASCENDING=true, DESCENDING=false;
private : tbb::concurrent_queue<string> outQueue;
public :void sort(size_t *b,int n)
{
a=b;
bitonicSorter& tt = *new(tbb::task::allocate_root()) bitonicSorter(0,n,ASCENDING,a);
tbb::task::spawn_root_and_wait(tt);
}
};
string convertInt(int number)
{
stringstream ss;//create a stringstream
ss << number;//add number to the stream
return ss.str();//return a string with the contents of the stream
}
TBitonicSort::TBitonicSort() :
filter(/*is_serial=*/true)
{}
/*override*/void* TBitonicSort::operator()( void* item ) {
int num_elem=2048;
size_t *max = new size_t[num_elem];
tbb::concurrent_queue<string>& queue = *static_cast<tbb::concurrent_queue<string>*>(item);
string line;
while(!queue.try_pop(line))
this_tbb_thread::yield();
istringstream iss(line);
int i=0;
do
{
string sub;
iss >> sub;
max[i]=atoi(sub.c_str());;
i++;
} while (iss);
sort(max,num_elem);
string out;
for(int i=0;i<num_elem;i++)
{
out.append(convertInt(max[i]).append(" "));
}
outQueue.push(out);
return &outQueue;
}
int run_pipe(int threads)
{
FILE* output_file = fopen("test.txt","w");
if( !output_file ) {
perror( "test.txt" );
return 0;
}
char* input_file="sample.txt";
tbb::pipeline pipeline;
FileReaderFilter reader(input_file);
pipeline.add_filter(reader);
TBitonicSort sorter;
pipeline.add_filter(sorter);
FileWriterFilter writer(output_file);
pipeline.add_filter(writer);
tbb::tick_count t0 = tbb::tick_count::now();
pipeline.run(threads);
tbb::tick_count t1 = tbb::tick_count::now();
fclose( output_file );
pipeline.clear();
if(threads==1){
printf("serial run time = %g\n", (t1-t0).seconds());
}
else{
printf("parallel run time = %g\n", (t1-t0).seconds());
}
return 0;
}
int main() {
int threads[2]={1,3};
for(int i=0;i<2;i++)
{
run_pipe(threads[i]);
}
system("PAUSE");
}