У меня есть такой код:
class MyTask {
public:
run(size_t pool_size) {
... // do some pre things
std::vector<std::string> name_list=read_a_list(); // read task list
std::vector<std::pair<std::string, double>> result_list; // name & time
boost::thread_pool pool(pool_size); // "pool_size" threads in pool
size_t max_task=2*pool_size; // max "2*pool_size" tasks in queue
size_t task_number=0; // using task_number to limit the number of tasks in queue
boost::mutex task_number_mu;
boost::condition_variable_any task_number_condition;
for(size_t idx=0;idx<name_list.size();++idx){
boost::unique_lock<boost::mutex> out_lock(task_number_mu);
task_number_condition.wait(out_lock, [&] {
return task_number < max_task;
});
++task_number;
boost::asio::post(pool,
[&,idx] {
{
boost::unique_lock<boost::mutex> in_lock(task_number_mu);
--task_number;
task_number_condition.notify_one();
}
std::string name=name_list[idx];
Timer timer; // a class using std::chrono to collect time
timer.Start();
A a=read_A_data(name+"_a.csv"); // one file
timer.Stop();
double time_a=timer.Elapsed();
B b=read_B_data(name+"_b"); // many files in "name_b" directory
timer.Stop();
double time_b=timer.Elapsed();
result_type result=do_some_selection(a,b); // very expensive function
timer.Stop();
double time_r=timer.Elapsed();
write_result(result,name+"_result.csv"); // one file
timer.Stop();
double time_w=timer.Elapsed();
... // output idx, time_{a,b,r,w} by boost log
{
boost::lock_guard<boost::mutex> lock(result_list_mu);
result_list.emplace_back(std::make_pair(name,time_w));
}
});//post
}//for
pool.join();
... // do some other things
} //run
public :
static A read_A_data(const std::string& name_a){
... // read "name_a" file, less than 1.5M
}
static B read_B_data(const std::string& name_b){
... // read files in "name_b" directory, more than 10 files, 1M~10M per file
}
static result_type do_some_selection(A a,B b){
result_type result;
for(const auto& i:b){
for(const auto& j:a){
if(b_fit_in_a(i,j)){ //b_fit_in_a() does not have loops
result.emplace_back(i);
}//if
}//for j
}//for i
return result;
}
static void write_result(const result_type& result, const std::string& name_r){
... // write result to "name_r", about 2M~15M
}
}
Когда я устанавливаю pool_size
в 1 (однопоточное), вывод времени будет выглядеть так:
1 14.7845 471.214 1491.16 1927.86
2 4.247 649.694 1327 1523.7
3 5.4375 924.407 2852.44 3276.1
4 4.1798 754.361 1078.97 1187.15
5 5.4944 1284.37 2935.02 3336.19
6 5.515 694.369 2825.79 3380.3
...
У меня есть Xeon-W, который является 16C32T, поэтому установите pool_size
на 8, и:
1 14.7919 2685.21 6600.4 7306.15
2 16.0127 2311.94 10517.2 12044.3
3 7.4403 2111.83 6210.49 7014.61
4 9.0292 2165.12 10482.5 11893
5 16.6851 1664.2 17282.7 20489.9
6 32.9876 6488.17 25730.6 25744.7
...
установите 16, и:
1 22.5189 5324.67 18018.6 20386
2 17.1096 8670.3 21245.8 23229.1
3 17.9065 10930.7 27335.3 29961.55
4 20.6321 5227.19 30733 34926
5 25.104 2372.04 13810.9 15916.7
6 39.6723 18734.3 79300.1 79393.5
...
установите 32, и:
1 39.3981 19159.7 43451.7 44527.1
2 51.1908 5693.48 43391.3 50314.4
3 42.4458 18068.6 59520.6 67359.4
4 44.1195 29214.7 70312.4 76902
5 64.1733 23071.1 86055.2 86146.7
6 44.1062 36277.5 89474.4 98104.7
...
Я понимаю, что многопоточные программы часто имеют проблемы с чтением / записью на диск, что объясняет увеличение time_a
, time_b
и time_w
. Но что меня смутило, так это то, что time_r
также сильно вырос. do_some_selection
является статической c функцией-членом, поэтому я не думаю, что потоки будут взаимодействовать, но кажется, что чем больше потоков я использую, тем больше времени займет одна задача. Что я сделал не так? Как я могу сделать такие задачи параллельными?