Я делаю проект, в котором я использую Boost MPI для создания системы Primary / Secondary (Master / Slave), где Primary распределяет работу среди Secondary в цикле. Примерно в ~ 30% случаев происходит сбой программы из-за следующего.
runner(72828,0x7fff903c6380) malloc: *** error for object 0x7fae75b07af8: incorrect checksum for freed object - object was probably modified after being freed.
*** set a breakpoint in malloc_error_break to debug
[Lappies:72828] *** Process received signal ***
[Lappies:72828] Signal: Segmentation fault: 11 (11)
[Lappies:72828] Signal code: Address not mapped (1)
[Lappies:72828] Failing at address: 0x8
[Lappies:72828] [ 0] 0 libsystem_platform.dylib 0x00007fff580a7f5a _sigtramp + 26
[Lappies:72828] [ 1] 0 libdyld.dylib 0x00007fff57d99292 dyld_stub_binder + 282
[Lappies:72828] [ 2] 0 libmpi.40.dylib 0x000000010be67339 ompi_comm_destruct + 32
[Lappies:72828] [ 3] 0 mca_pml_ob1.so 0x000000010d2c5308 mca_pml_ob1_iprobe + 549
[Lappies:72828] [ 4] 0 libmpi.40.dylib 0x000000010bea007f MPI_Iprobe + 284
[Lappies:72828] [ 5] 0 libboost_mpi-mt.dylib 0x000000010c005aae _ZNK5boost3mpi12communicator6iprobeEii + 62
[Lappies:72828] [ 6] [Lappies:72828] *** Process received signal ***
[Lappies:72828] Signal: Abort trap: 6 (6)
[Lappies:72828] Signal code: (0)
[Lappies:72828] [ 0] 0 run_hi 0x000000010b539155 _ZN9Secondary12recvMessagesEv + 69
0 libsystem_platform.dylib 0x00007fff580a7f5a _sigtramp + 26
[Lappies:72828] [ 1] [Lappies:72828] [ 7] 0 ??? 0x000000000000ffff 0x0 + 65535
[Lappies:72828] [ 2] 0 run_hi 0x000000010b526915 _ZN9Secondary3runEv + 53
[Lappies:72828] [ 8] 0 libsystem_c.dylib 0x00007fff57e451ae abort + 127
[Lappies:72828] [ 3] 0 run_hi 0x000000010b52ff43 _ZN7Primary9runWorkerEv + 35
[Lappies:72828] 0 libsystem_malloc.dylib 0x00007fff57f4ead4 szone_error + 596
[ 9] [Lappies:72828] [ 4] 0 run_hi 0x000000010b532bb1 _ZNSt3__114__thread_proxyINS_5tupleIJNS_10unique_ptrINS_15__thread_structENS_14default_deleteIS3_EEEEPFvvEEEEEEPvSA_ + 497
[Lappies:72828] [10] 0 libsystem_malloc.dylib 0x00007fff57f44721 tiny_free_list_remove_ptr + 298
[Lappies:72828] [ 5] 0 libsystem_pthread.dylib 0x00007fff580b1661 _pthread_body + 340
[Lappies:72828] [11] 0 libsystem_pthread.dylib 0x00007fff580b150d _pthread_body + 0
[Lappies:72828] [12] 0 libsystem_pthread.dylib 0x00007fff580b0bf9 thread_start + 13
[Lappies:72828] *** End of error message ***
0 libsystem_malloc.dylib 0x00007fff57f59aca tiny_free_no_lock + 1450
[Lappies:72828] [ 6] 0 libsystem_malloc.dylib 0x00007fff57f5a256 free_tiny + 628
--------------------------------------------------------------------------
mpirun noticed that process rank 0 with PID 0 on node Lappies exited on signal 11 (Segmentation fault: 11).
--------------------------------------------------------------------------
Как вы можете видеть, это происходит сбой в надстройке mpi iprobe. Это иногда случается в Secondary::recvMessages
, а иногда в Secondary::checkQuit
примерно с одинаковой частотой.
Является ли эта проблема моим кодом? Или есть ошибка в Boost MPI, и если да, то как я могу обойтись?
Несмотря на это, я нахожусь в тупике, как решить эту проблему.
Secondary.hpp
namespace mpi = boost::mpi;
class Secondary {
mpi::communicator world;
volatile bool quit;
std::list<WorkItem*> queue;
std::list<mpi::request> sends;
int rank;
void resolveSends(){
if (!sends.empty()){
sends.remove_if([](mpi::request req){ return req.test(); });
}
}
public:
explicit Secondary(int rank) : rank(rank), quit(false) {}
void dowork(){
/// take everything from the queue and do the work
if (!queue.empty()){ /// Do work
WorkItem* pwi = queue.front();
queue.pop_front();
std::list<WorkItem*> l = pwi->work();
for (auto& i : l){
queue.push_back(i);
}
ReturnResult rr = ReturnResult(rank, queue.size());
std::cout << " Secondary " << rank << " workleft=" << queue.size()
<< " isend (" << pwi->getId() << " ) " << rr.workerid << " : " << rr.remaining <<std::endl;
sends.push_back(world.isend(0, TagType::WORK_STATUS, rr.remaining));
checkQuit(); /// make sure we respond to quits
delete pwi;
}
resolveSends();
}
void recvMessages() {
while (world.iprobe(0, TagType::WORK)) {
std::vector<WorkItem*> pwi;
world.recv(0, TagType::WORK, pwi);
std::cout << " Secondary " << world.rank() << " found (" << pwi.size() << " ) " << std::endl;
for (auto& i : pwi){
queue.push_back(i);
}
}
}
bool checkQuit() {
if (world.iprobe(0, TagType::QUIT)) {
std::cout << " Secondary " << world.rank() << " found QUIT " << std::endl;
world.recv(0, TagType::QUIT);
quit = true;
return true;
}
return false;
}
void run() {
/// for some reason going through this loop will often cause iprobe crashes
while (!quit) {
recvMessages(); /// check for more workitems
dowork(); /// do the work
if (checkQuit()){ /// check for quit
resolveSends();
break;}
/// Yield is insufficient
std::this_thread::yield();
}
}
};
Primary.hpp
namespace mpi = boost::mpi;
class Primary {
mpi::communicator world;
JobHandler jh;
std::list<mpi::request> sends;
void resolveSends(){
if (!sends.empty()) {
sends.remove_if([](mpi::request req) { return req.test(); });
}
}
public:
Primary() : jh(world.size()){}
static void runWorker(){
Secondary worker(0);
worker.run();
}
void runJobs(){
std::vector<int> workerIds(world.size());
std::iota(workerIds.begin(), workerIds.end(), 0);
const int nworkers = static_cast<int>(workerIds.size());
std::thread workerThread = std::thread(runWorker);
std::list<WorkItem*> allitems = jh.getAllItems();
int sendto;
while (true) {
/// Send all of our items
if (!allitems.empty()){
std::vector<WorkItem*> v{ std::begin(allitems), std::end(allitems) };
for (int i=0, sendto=1; i< nworkers;++i, ++sendto){
std::vector<WorkItem *> send = vecutil::split(v, nworkers, i);
std::cout << " >>> " << workerIds[sendto % nworkers] << " " << send.size() << " " << allitems.size() << std::endl;
sends.push_back(world.isend(workerIds[sendto % nworkers], TagType::WORK, send));
jh.sendingWorkTo(sendto, send.size());
}
}
resolveSends();
std::this_thread::yield();
std::for_each(allitems.begin(), allitems.end(), vecutil::DeleteVector<WorkItem*>());
allitems.clear();
/// Check for done items
for (auto& i : workerIds) {
while (world.iprobe(i, TagType::WORK_STATUS)) {
int r;
mpi::request req = world.irecv(i, TagType::WORK_STATUS, r);
jh.workItemComplete(ReturnResult(i,r));
std::cout << jh << " <<< Secondary " <<i << " remaining=" << r <<
" complete=" << jh.isComplete() << std::endl;
}
}
/// Check for complete
if (jh.isComplete()){
std::cout << "====\n >>> complete " << std::endl;
break;
}
std::this_thread::yield();
}
std::cout << "====================== sending quit " << std::endl;
mpi::request reqs[nworkers];
int n=0;
for (auto& i : workerIds) {
reqs[n++] = world.isend(i, TagType::QUIT);
}
mpi::wait_all(reqs, reqs + nworkers);
std::cout << "====================== gathering " << std::endl;
workerThread.join();
std::cout << "====================== quitting " << std::endl;
}
void addJob(Job* pjob) {
jh.addJob(pjob);
}
};
Я могу предоставить любой дополнительный код, который необходим, но я думаю, что это соответствующий раздел.